1use std::{
2 fs,
3 io::{BufRead, BufReader, BufWriter, Write},
4 path::{Path, PathBuf},
5 process::{Child, ChildStdin, Command, Stdio},
6 sync::Arc,
7};
8
9use capnp::message::Builder;
10use capnp::serialize;
11use parking_lot::Mutex;
12use serde_json;
13use tracing::{debug, info, warn};
14
15use crate::config::ProcessPluginConfig;
16use crate::error::{EventError, Result};
17use crate::plugin::registry::{InstalledPluginRecord, load_registry, registry_path};
18use crate::plugin_capnp;
19use crate::schema::AggregateSchema;
20use crate::store::{AggregateState, EventRecord};
21
22use super::{Plugin, PluginDelivery};
23
24const CHANNEL_LABEL_STDOUT: &str = "stdout";
25const CHANNEL_LABEL_STDERR: &str = "stderr";
26
27fn sanitize_identifier(identifier: &str) -> String {
28 identifier
29 .chars()
30 .map(|ch| {
31 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
32 ch
33 } else {
34 '_'
35 }
36 })
37 .collect()
38}
39
40pub fn status_file_path(data_dir: &Path, identifier: &str) -> PathBuf {
41 data_dir
42 .join("plugins")
43 .join("run")
44 .join(format!("{}.pid", sanitize_identifier(identifier)))
45}
46
47pub struct ProcessPlugin {
48 connection: Arc<ProcessConnection>,
49}
50
51impl ProcessPlugin {
52 pub fn new(identifier: String, config: ProcessPluginConfig, data_dir: &Path) -> Result<Self> {
53 let registry_path = registry_path(data_dir);
54 let registry = load_registry(®istry_path)?;
55 let target = current_target_triple();
56 let record = registry
57 .iter()
58 .find(|entry| {
59 entry.name == config.name
60 && entry.version == config.version
61 && entry.target == target
62 })
63 .cloned()
64 .ok_or_else(|| {
65 EventError::Config(format!(
66 "plugin {}@{} is not installed for target {}",
67 config.name, config.version, target
68 ))
69 })?;
70
71 let status_path = status_file_path(data_dir, &identifier);
72 let connection = ProcessConnection::new(identifier, config, record, status_path)?;
73 connection.ensure_started()?;
74
75 Ok(Self {
76 connection: Arc::new(connection),
77 })
78 }
79}
80
81struct ProcessConnection {
82 identifier: String,
83 config: ProcessPluginConfig,
84 record: InstalledPluginRecord,
85 status_path: PathBuf,
86 inner: Mutex<Option<ProcessInner>>,
87}
88
89struct ProcessInner {
90 child: Child,
91 writer: BufWriter<ChildStdin>,
92 sequence: u64,
93}
94
95impl ProcessConnection {
96 fn new(
97 identifier: String,
98 config: ProcessPluginConfig,
99 record: InstalledPluginRecord,
100 status_path: PathBuf,
101 ) -> Result<Self> {
102 let connection = Self {
103 identifier,
104 config,
105 record,
106 status_path,
107 inner: Mutex::new(None),
108 };
109 if let Err(err) = connection.clear_status_file() {
110 warn!(
111 target: "eventdbx.plugin",
112 "failed to clear stale status file for {}: {}",
113 connection.identifier,
114 err
115 );
116 }
117 Ok(connection)
118 }
119
120 fn ensure_started(&self) -> Result<()> {
121 let mut guard = self.inner.lock();
122 self.ensure_running(&mut guard)?;
123 if guard.is_none() {
124 self.restart(&mut guard)?;
125 }
126 Ok(())
127 }
128
129 fn send_event(
130 &self,
131 record: &EventRecord,
132 state: &AggregateState,
133 schema: Option<&AggregateSchema>,
134 ) -> Result<()> {
135 let mut guard = self.inner.lock();
136 self.ensure_running(&mut guard)?;
137 if guard.is_none() {
138 self.restart(&mut guard)?;
139 }
140
141 let inner = guard
142 .as_mut()
143 .ok_or_else(|| EventError::Storage("plugin process failed to start".into()))?;
144
145 if let Err(err) = self.write_event(inner, record, state, schema) {
146 warn!(
147 target: "eventdbx.plugin",
148 "failed to deliver event to plugin {} ({}), attempting restart: {}",
149 self.identifier,
150 self.config.name,
151 err
152 );
153 self.restart(&mut guard)?;
154 let inner = guard
155 .as_mut()
156 .ok_or_else(|| EventError::Storage("plugin process failed to restart".into()))?;
157 self.write_event(inner, record, state, schema)?;
158 }
159 Ok(())
160 }
161
162 fn ensure_running(&self, guard: &mut Option<ProcessInner>) -> Result<()> {
163 if let Some(inner) = guard.as_mut() {
164 if let Ok(Some(status)) = inner.child.try_wait() {
165 warn!(
166 target: "eventdbx.plugin",
167 "plugin {} exited with status {}",
168 self.identifier,
169 status
170 );
171 if let Err(err) = self.clear_status_file() {
172 warn!(
173 target: "eventdbx.plugin",
174 "failed to clear status file for {}: {}",
175 self.identifier,
176 err
177 );
178 }
179 *guard = None;
180 }
181 }
182 Ok(())
183 }
184
185 fn restart(&self, guard: &mut Option<ProcessInner>) -> Result<()> {
186 if let Err(err) = self.clear_status_file() {
187 warn!(
188 target: "eventdbx.plugin",
189 "failed to clear status file for {} during restart: {}",
190 self.identifier,
191 err
192 );
193 }
194 let mut inner = self.spawn_process()?;
195 self.write_init(&mut inner)?;
196 *guard = Some(inner);
197 Ok(())
198 }
199
200 fn spawn_process(&self) -> Result<ProcessInner> {
201 let mut command = Command::new(&self.record.binary_path);
202 if !self.config.args.is_empty() {
203 command.args(&self.config.args);
204 }
205 for (key, value) in &self.config.env {
206 command.env(key, value);
207 }
208 if let Some(dir) = &self.config.working_dir {
209 command.current_dir(dir);
210 } else {
211 command.current_dir(&self.record.install_dir);
212 }
213 command
214 .stdin(Stdio::piped())
215 .stdout(Stdio::piped())
216 .stderr(Stdio::piped());
217
218 let mut child = command.spawn().map_err(|err| {
219 EventError::Storage(format!(
220 "failed to launch plugin {}: {}",
221 self.identifier, err
222 ))
223 })?;
224
225 let stdin = child.stdin.take().ok_or_else(|| {
226 EventError::Storage(format!("plugin {} did not provide stdin", self.identifier))
227 })?;
228
229 if let Some(stdout) = child.stdout.take() {
230 spawn_stream_logger(self.identifier.clone(), CHANNEL_LABEL_STDOUT, stdout);
231 }
232
233 if let Some(stderr) = child.stderr.take() {
234 spawn_stream_logger(self.identifier.clone(), CHANNEL_LABEL_STDERR, stderr);
235 }
236
237 let writer = BufWriter::new(stdin);
238
239 if let Err(err) = self.write_status_file(child.id()) {
240 warn!(
241 target: "eventdbx.plugin",
242 "failed to write status file for {}: {}",
243 self.identifier,
244 err
245 );
246 }
247
248 Ok(ProcessInner {
249 child,
250 writer,
251 sequence: 0,
252 })
253 }
254
255 fn write_init(&self, inner: &mut ProcessInner) -> Result<()> {
256 let mut message = Builder::new_default();
257 {
258 let mut envelope = message.init_root::<plugin_capnp::plugin_envelope::Builder>();
259 let mut message_builder = envelope.reborrow().init_message();
260 let mut init = message_builder.reborrow().init_init();
261 init.set_plugin_name(&self.config.name);
262 init.set_version(&self.config.version);
263 init.set_target(&self.record.target);
264 }
265 serialize::write_message(&mut inner.writer, &message)
266 .map_err(|err| EventError::Serialization(err.to_string()))?;
267 inner.writer.flush()?;
268 Ok(())
269 }
270
271 fn write_event(
272 &self,
273 inner: &mut ProcessInner,
274 record: &EventRecord,
275 state: &AggregateState,
276 schema: Option<&AggregateSchema>,
277 ) -> Result<()> {
278 let event_id = record.metadata.event_id.to_string();
279 let payload_json = serde_json::to_string(&record.payload)?;
280 let metadata_json = serde_json::to_string(&record.metadata)?;
281 let schema_json = match schema {
282 Some(schema) => Some(serde_json::to_string(schema)?),
283 None => None,
284 };
285 let extensions_json = match &record.extensions {
286 Some(value) => Some(serde_json::to_string(value)?),
287 None => None,
288 };
289
290 let mut message = Builder::new_default();
291 {
292 let mut envelope = message.init_root::<plugin_capnp::plugin_envelope::Builder>();
293 let mut union_builder = envelope.reborrow().init_message();
294 let mut event = union_builder.reborrow().init_event();
295 event.set_sequence(inner.sequence);
296 event.set_aggregate_type(&record.aggregate_type);
297 event.set_aggregate_id(&record.aggregate_id);
298 event.set_event_type(&record.event_type);
299 event.set_event_version(record.version);
300 event.set_event_id(&event_id);
301 event.set_created_at_epoch_micros(record.metadata.created_at.timestamp_micros());
302 event.set_payload_json(&payload_json);
303 event.set_metadata_json(&metadata_json);
304 match extensions_json {
305 Some(ref json) => event.set_extensions_json(json),
306 None => event.set_extensions_json("null"),
307 }
308 event.set_hash(&record.hash);
309 event.set_merkle_root(&record.merkle_root);
310
311 event.set_state_version(state.version);
312 event.set_state_archived(state.archived);
313 event.set_state_merkle_root(&state.merkle_root);
314
315 match schema_json {
316 Some(ref json) => event.set_schema_json(json),
317 None => event.set_schema_json("null"),
318 }
319
320 let mut entries = event.init_state_entries(state.state.len() as u32);
321 for (idx, (key, value)) in state.state.iter().enumerate() {
322 let mut entry = entries.reborrow().get(idx as u32);
323 entry.set_key(key);
324 entry.set_value(value);
325 }
326 }
327
328 serialize::write_message(&mut inner.writer, &message)
329 .map_err(|err| EventError::Serialization(err.to_string()))?;
330 inner.writer.flush()?;
331 inner.sequence = inner.sequence.wrapping_add(1);
332 Ok(())
333 }
334
335 fn write_status_file(&self, pid: u32) -> std::io::Result<()> {
336 if let Some(parent) = self.status_path.parent() {
337 fs::create_dir_all(parent)?;
338 }
339 fs::write(&self.status_path, format!("{pid}\n"))
340 }
341
342 fn clear_status_file(&self) -> std::io::Result<()> {
343 match fs::remove_file(&self.status_path) {
344 Ok(()) => Ok(()),
345 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
346 Err(err) => Err(err),
347 }
348 }
349}
350
351impl Drop for ProcessConnection {
352 fn drop(&mut self) {
353 if let Err(err) = self.clear_status_file() {
354 warn!(
355 target: "eventdbx.plugin",
356 "failed to clear status file for {} on drop: {}",
357 self.identifier,
358 err
359 );
360 }
361 }
362}
363
364impl Plugin for ProcessPlugin {
365 fn name(&self) -> &'static str {
366 "process"
367 }
368
369 fn notify_event(&self, delivery: PluginDelivery<'_>) -> Result<()> {
370 let Some(record) = delivery.record else {
371 return Ok(());
372 };
373 let Some(state) = delivery.state else {
374 return Ok(());
375 };
376 self.connection.send_event(record, state, delivery.schema)
377 }
378}
379
380fn spawn_stream_logger<R>(identifier: String, channel: &'static str, reader: R)
381where
382 R: std::io::Read + Send + 'static,
383{
384 std::thread::spawn(move || {
385 let mut buf_reader = BufReader::new(reader);
386 let mut line = String::new();
387 loop {
388 line.clear();
389 match buf_reader.read_line(&mut line) {
390 Ok(0) => break,
391 Ok(_) => {
392 let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
393 if !trimmed.is_empty() {
394 info!(
395 target: "eventdbx.plugin",
396 "{} {}: {}",
397 identifier,
398 channel,
399 trimmed
400 );
401 }
402 }
403 Err(err) => {
404 debug!(
405 target: "eventdbx.plugin",
406 "error reading {} from plugin {}: {}",
407 channel,
408 identifier,
409 err
410 );
411 break;
412 }
413 }
414 }
415 });
416}
417
418fn current_target_triple() -> String {
419 format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH)
420}