eventdbx/plugin/
process.rs

1use std::{
2    fs,
3    io::{BufRead, BufReader, BufWriter, Write},
4    path::{Path, PathBuf},
5    process::{Child, ChildStdin, Command, Stdio},
6    sync::Arc,
7};
8
9use capnp::message::Builder;
10use capnp::serialize;
11use parking_lot::Mutex;
12use serde_json;
13use tracing::{debug, info, warn};
14
15use crate::config::ProcessPluginConfig;
16use crate::error::{EventError, Result};
17use crate::plugin::registry::{InstalledPluginRecord, load_registry, registry_path};
18use crate::plugin_capnp;
19use crate::schema::AggregateSchema;
20use crate::store::{AggregateState, EventRecord};
21
22use super::{Plugin, PluginDelivery};
23
24const CHANNEL_LABEL_STDOUT: &str = "stdout";
25const CHANNEL_LABEL_STDERR: &str = "stderr";
26
27fn sanitize_identifier(identifier: &str) -> String {
28    identifier
29        .chars()
30        .map(|ch| {
31            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
32                ch
33            } else {
34                '_'
35            }
36        })
37        .collect()
38}
39
40pub fn status_file_path(data_dir: &Path, identifier: &str) -> PathBuf {
41    data_dir
42        .join("plugins")
43        .join("run")
44        .join(format!("{}.pid", sanitize_identifier(identifier)))
45}
46
47pub struct ProcessPlugin {
48    connection: Arc<ProcessConnection>,
49}
50
51impl ProcessPlugin {
52    pub fn new(identifier: String, config: ProcessPluginConfig, data_dir: &Path) -> Result<Self> {
53        let registry_path = registry_path(data_dir);
54        let registry = load_registry(&registry_path)?;
55        let target = current_target_triple();
56        let record = registry
57            .iter()
58            .find(|entry| {
59                entry.name == config.name
60                    && entry.version == config.version
61                    && entry.target == target
62            })
63            .cloned()
64            .ok_or_else(|| {
65                EventError::Config(format!(
66                    "plugin {}@{} is not installed for target {}",
67                    config.name, config.version, target
68                ))
69            })?;
70
71        let status_path = status_file_path(data_dir, &identifier);
72        let connection = ProcessConnection::new(identifier, config, record, status_path)?;
73        connection.ensure_started()?;
74
75        Ok(Self {
76            connection: Arc::new(connection),
77        })
78    }
79}
80
81struct ProcessConnection {
82    identifier: String,
83    config: ProcessPluginConfig,
84    record: InstalledPluginRecord,
85    status_path: PathBuf,
86    inner: Mutex<Option<ProcessInner>>,
87}
88
89struct ProcessInner {
90    child: Child,
91    writer: BufWriter<ChildStdin>,
92    sequence: u64,
93}
94
95impl ProcessConnection {
96    fn new(
97        identifier: String,
98        config: ProcessPluginConfig,
99        record: InstalledPluginRecord,
100        status_path: PathBuf,
101    ) -> Result<Self> {
102        let connection = Self {
103            identifier,
104            config,
105            record,
106            status_path,
107            inner: Mutex::new(None),
108        };
109        if let Err(err) = connection.clear_status_file() {
110            warn!(
111                target: "eventdbx.plugin",
112                "failed to clear stale status file for {}: {}",
113                connection.identifier,
114                err
115            );
116        }
117        Ok(connection)
118    }
119
120    fn ensure_started(&self) -> Result<()> {
121        let mut guard = self.inner.lock();
122        self.ensure_running(&mut guard)?;
123        if guard.is_none() {
124            self.restart(&mut guard)?;
125        }
126        Ok(())
127    }
128
129    fn send_event(
130        &self,
131        record: &EventRecord,
132        state: &AggregateState,
133        schema: Option<&AggregateSchema>,
134    ) -> Result<()> {
135        let mut guard = self.inner.lock();
136        self.ensure_running(&mut guard)?;
137        if guard.is_none() {
138            self.restart(&mut guard)?;
139        }
140
141        let inner = guard
142            .as_mut()
143            .ok_or_else(|| EventError::Storage("plugin process failed to start".into()))?;
144
145        if let Err(err) = self.write_event(inner, record, state, schema) {
146            warn!(
147                target: "eventdbx.plugin",
148                "failed to deliver event to plugin {} ({}), attempting restart: {}",
149                self.identifier,
150                self.config.name,
151                err
152            );
153            self.restart(&mut guard)?;
154            let inner = guard
155                .as_mut()
156                .ok_or_else(|| EventError::Storage("plugin process failed to restart".into()))?;
157            self.write_event(inner, record, state, schema)?;
158        }
159        Ok(())
160    }
161
162    fn ensure_running(&self, guard: &mut Option<ProcessInner>) -> Result<()> {
163        if let Some(inner) = guard.as_mut() {
164            if let Ok(Some(status)) = inner.child.try_wait() {
165                warn!(
166                    target: "eventdbx.plugin",
167                    "plugin {} exited with status {}",
168                    self.identifier,
169                    status
170                );
171                if let Err(err) = self.clear_status_file() {
172                    warn!(
173                        target: "eventdbx.plugin",
174                        "failed to clear status file for {}: {}",
175                        self.identifier,
176                        err
177                    );
178                }
179                *guard = None;
180            }
181        }
182        Ok(())
183    }
184
185    fn restart(&self, guard: &mut Option<ProcessInner>) -> Result<()> {
186        if let Err(err) = self.clear_status_file() {
187            warn!(
188                target: "eventdbx.plugin",
189                "failed to clear status file for {} during restart: {}",
190                self.identifier,
191                err
192            );
193        }
194        let mut inner = self.spawn_process()?;
195        self.write_init(&mut inner)?;
196        *guard = Some(inner);
197        Ok(())
198    }
199
200    fn spawn_process(&self) -> Result<ProcessInner> {
201        let mut command = Command::new(&self.record.binary_path);
202        if !self.config.args.is_empty() {
203            command.args(&self.config.args);
204        }
205        for (key, value) in &self.config.env {
206            command.env(key, value);
207        }
208        if let Some(dir) = &self.config.working_dir {
209            command.current_dir(dir);
210        } else {
211            command.current_dir(&self.record.install_dir);
212        }
213        command
214            .stdin(Stdio::piped())
215            .stdout(Stdio::piped())
216            .stderr(Stdio::piped());
217
218        let mut child = command.spawn().map_err(|err| {
219            EventError::Storage(format!(
220                "failed to launch plugin {}: {}",
221                self.identifier, err
222            ))
223        })?;
224
225        let stdin = child.stdin.take().ok_or_else(|| {
226            EventError::Storage(format!("plugin {} did not provide stdin", self.identifier))
227        })?;
228
229        if let Some(stdout) = child.stdout.take() {
230            spawn_stream_logger(self.identifier.clone(), CHANNEL_LABEL_STDOUT, stdout);
231        }
232
233        if let Some(stderr) = child.stderr.take() {
234            spawn_stream_logger(self.identifier.clone(), CHANNEL_LABEL_STDERR, stderr);
235        }
236
237        let writer = BufWriter::new(stdin);
238
239        if let Err(err) = self.write_status_file(child.id()) {
240            warn!(
241                target: "eventdbx.plugin",
242                "failed to write status file for {}: {}",
243                self.identifier,
244                err
245            );
246        }
247
248        Ok(ProcessInner {
249            child,
250            writer,
251            sequence: 0,
252        })
253    }
254
255    fn write_init(&self, inner: &mut ProcessInner) -> Result<()> {
256        let mut message = Builder::new_default();
257        {
258            let mut envelope = message.init_root::<plugin_capnp::plugin_envelope::Builder>();
259            let mut message_builder = envelope.reborrow().init_message();
260            let mut init = message_builder.reborrow().init_init();
261            init.set_plugin_name(&self.config.name);
262            init.set_version(&self.config.version);
263            init.set_target(&self.record.target);
264        }
265        serialize::write_message(&mut inner.writer, &message)
266            .map_err(|err| EventError::Serialization(err.to_string()))?;
267        inner.writer.flush()?;
268        Ok(())
269    }
270
271    fn write_event(
272        &self,
273        inner: &mut ProcessInner,
274        record: &EventRecord,
275        state: &AggregateState,
276        schema: Option<&AggregateSchema>,
277    ) -> Result<()> {
278        let event_id = record.metadata.event_id.to_string();
279        let payload_json = serde_json::to_string(&record.payload)?;
280        let metadata_json = serde_json::to_string(&record.metadata)?;
281        let schema_json = match schema {
282            Some(schema) => Some(serde_json::to_string(schema)?),
283            None => None,
284        };
285        let extensions_json = match &record.extensions {
286            Some(value) => Some(serde_json::to_string(value)?),
287            None => None,
288        };
289
290        let mut message = Builder::new_default();
291        {
292            let mut envelope = message.init_root::<plugin_capnp::plugin_envelope::Builder>();
293            let mut union_builder = envelope.reborrow().init_message();
294            let mut event = union_builder.reborrow().init_event();
295            event.set_sequence(inner.sequence);
296            event.set_aggregate_type(&record.aggregate_type);
297            event.set_aggregate_id(&record.aggregate_id);
298            event.set_event_type(&record.event_type);
299            event.set_event_version(record.version);
300            event.set_event_id(&event_id);
301            event.set_created_at_epoch_micros(record.metadata.created_at.timestamp_micros());
302            event.set_payload_json(&payload_json);
303            event.set_metadata_json(&metadata_json);
304            match extensions_json {
305                Some(ref json) => event.set_extensions_json(json),
306                None => event.set_extensions_json("null"),
307            }
308            event.set_hash(&record.hash);
309            event.set_merkle_root(&record.merkle_root);
310
311            event.set_state_version(state.version);
312            event.set_state_archived(state.archived);
313            event.set_state_merkle_root(&state.merkle_root);
314
315            match schema_json {
316                Some(ref json) => event.set_schema_json(json),
317                None => event.set_schema_json("null"),
318            }
319
320            let mut entries = event.init_state_entries(state.state.len() as u32);
321            for (idx, (key, value)) in state.state.iter().enumerate() {
322                let mut entry = entries.reborrow().get(idx as u32);
323                entry.set_key(key);
324                entry.set_value(value);
325            }
326        }
327
328        serialize::write_message(&mut inner.writer, &message)
329            .map_err(|err| EventError::Serialization(err.to_string()))?;
330        inner.writer.flush()?;
331        inner.sequence = inner.sequence.wrapping_add(1);
332        Ok(())
333    }
334
335    fn write_status_file(&self, pid: u32) -> std::io::Result<()> {
336        if let Some(parent) = self.status_path.parent() {
337            fs::create_dir_all(parent)?;
338        }
339        fs::write(&self.status_path, format!("{pid}\n"))
340    }
341
342    fn clear_status_file(&self) -> std::io::Result<()> {
343        match fs::remove_file(&self.status_path) {
344            Ok(()) => Ok(()),
345            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
346            Err(err) => Err(err),
347        }
348    }
349}
350
351impl Drop for ProcessConnection {
352    fn drop(&mut self) {
353        if let Err(err) = self.clear_status_file() {
354            warn!(
355                target: "eventdbx.plugin",
356                "failed to clear status file for {} on drop: {}",
357                self.identifier,
358                err
359            );
360        }
361    }
362}
363
364impl Plugin for ProcessPlugin {
365    fn name(&self) -> &'static str {
366        "process"
367    }
368
369    fn notify_event(&self, delivery: PluginDelivery<'_>) -> Result<()> {
370        let Some(record) = delivery.record else {
371            return Ok(());
372        };
373        let Some(state) = delivery.state else {
374            return Ok(());
375        };
376        self.connection.send_event(record, state, delivery.schema)
377    }
378}
379
380fn spawn_stream_logger<R>(identifier: String, channel: &'static str, reader: R)
381where
382    R: std::io::Read + Send + 'static,
383{
384    std::thread::spawn(move || {
385        let mut buf_reader = BufReader::new(reader);
386        let mut line = String::new();
387        loop {
388            line.clear();
389            match buf_reader.read_line(&mut line) {
390                Ok(0) => break,
391                Ok(_) => {
392                    let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
393                    if !trimmed.is_empty() {
394                        info!(
395                            target: "eventdbx.plugin",
396                            "{} {}: {}",
397                            identifier,
398                            channel,
399                            trimmed
400                        );
401                    }
402                }
403                Err(err) => {
404                    debug!(
405                        target: "eventdbx.plugin",
406                        "error reading {} from plugin {}: {}",
407                        channel,
408                        identifier,
409                        err
410                    );
411                    break;
412                }
413            }
414        }
415    });
416}
417
418fn current_target_triple() -> String {
419    format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH)
420}