Skip to main content

camel_core/lifecycle/adapters/
event_journal.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::sync::OnceLock;
6
7use async_trait::async_trait;
8use tokio::fs::{self, OpenOptions};
9use tokio::io::AsyncWriteExt;
10use tokio::sync::Mutex;
11
12use camel_api::CamelError;
13
14use crate::lifecycle::domain::RuntimeEvent;
15use crate::lifecycle::ports::RuntimeEventJournalPort;
16
17static JOURNAL_LOCKS: OnceLock<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> =
18    OnceLock::new();
19const DEFAULT_COMMAND_COMPACTION_THRESHOLD_BYTES: u64 = 256 * 1024;
20
21fn lock_for_path(path: &Path) -> Arc<Mutex<()>> {
22    let map = JOURNAL_LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()));
23    let mut guard = map
24        .lock()
25        .expect("runtime event journal lock map poisoned unexpectedly");
26    guard
27        .entry(path.to_path_buf())
28        .or_insert_with(|| Arc::new(Mutex::new(())))
29        .clone()
30}
31
32#[derive(Clone)]
33pub struct FileRuntimeEventJournal {
34    path: PathBuf,
35    command_path: PathBuf,
36    write_lock: Arc<Mutex<()>>,
37    command_compaction_threshold_bytes: u64,
38}
39
40impl FileRuntimeEventJournal {
41    pub fn new(path: impl Into<PathBuf>) -> Result<Self, CamelError> {
42        Self::with_compaction_threshold(path, DEFAULT_COMMAND_COMPACTION_THRESHOLD_BYTES)
43    }
44
45    fn with_compaction_threshold(
46        path: impl Into<PathBuf>,
47        command_compaction_threshold_bytes: u64,
48    ) -> Result<Self, CamelError> {
49        let path = path.into();
50        if let Some(parent) = path.parent() {
51            std::fs::create_dir_all(parent)?;
52        }
53        std::fs::OpenOptions::new()
54            .create(true)
55            .append(true)
56            .open(&path)?;
57        let command_path = path.with_extension(
58            path.extension()
59                .map(|ext| format!("{}{}", ext.to_string_lossy(), ".cmdids"))
60                .unwrap_or_else(|| "cmdids".to_string()),
61        );
62        std::fs::OpenOptions::new()
63            .create(true)
64            .append(true)
65            .open(&command_path)?;
66        let canonical_path = std::fs::canonicalize(&path).unwrap_or(path.clone());
67
68        Ok(Self {
69            path,
70            command_path,
71            write_lock: lock_for_path(&canonical_path),
72            command_compaction_threshold_bytes,
73        })
74    }
75
76    fn parse_seen_command_ids(data: &str) -> HashSet<String> {
77        let mut seen = HashSet::new();
78        for raw_line in data.lines() {
79            let line = raw_line.trim();
80            if line.is_empty() {
81                continue;
82            }
83            if let Some(rest) = line.strip_prefix('+') {
84                seen.insert(rest.to_string());
85                continue;
86            }
87            if let Some(rest) = line.strip_prefix('-') {
88                seen.remove(rest);
89                continue;
90            }
91            // Backward compatibility for older plain-line format.
92            seen.insert(line.to_string());
93        }
94        seen
95    }
96
97    async fn compact_command_log_if_needed(&self) -> Result<(), CamelError> {
98        let metadata = match fs::metadata(&self.command_path).await {
99            Ok(metadata) => metadata,
100            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
101            Err(err) => return Err(CamelError::from(err)),
102        };
103
104        if metadata.len() < self.command_compaction_threshold_bytes {
105            return Ok(());
106        }
107
108        let data = match fs::read_to_string(&self.command_path).await {
109            Ok(content) => content,
110            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
111            Err(err) => return Err(CamelError::from(err)),
112        };
113
114        let seen = Self::parse_seen_command_ids(&data);
115        let mut ids: Vec<String> = seen.into_iter().collect();
116        ids.sort();
117
118        let mut compacted = String::new();
119        for id in ids {
120            compacted.push('+');
121            compacted.push_str(&id);
122            compacted.push('\n');
123        }
124        fs::write(&self.command_path, compacted).await?;
125        Ok(())
126    }
127}
128
129#[async_trait]
130impl RuntimeEventJournalPort for FileRuntimeEventJournal {
131    async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
132        if events.is_empty() {
133            return Ok(());
134        }
135
136        let _guard = self.write_lock.lock().await;
137        let mut file = OpenOptions::new()
138            .create(true)
139            .append(true)
140            .open(&self.path)
141            .await?;
142
143        for event in events {
144            let encoded = serde_json::to_string(event).map_err(|err| {
145                CamelError::RouteError(format!("event journal encode failed: {err}"))
146            })?;
147            file.write_all(encoded.as_bytes()).await?;
148            file.write_all(b"\n").await?;
149        }
150        file.flush().await?;
151        Ok(())
152    }
153
154    async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
155        let _guard = self.write_lock.lock().await;
156        let data = match fs::read_to_string(&self.path).await {
157            Ok(content) => content,
158            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
159            Err(err) => return Err(CamelError::from(err)),
160        };
161
162        let mut events = Vec::new();
163        let stream = serde_json::Deserializer::from_str(&data).into_iter::<RuntimeEvent>();
164        for event in stream {
165            events.push(event.map_err(|err| {
166                CamelError::RouteError(format!("event journal decode failed: {err}"))
167            })?);
168        }
169
170        Ok(events)
171    }
172
173    async fn append_command_id(&self, command_id: &str) -> Result<(), CamelError> {
174        let _guard = self.write_lock.lock().await;
175        let mut file = OpenOptions::new()
176            .create(true)
177            .append(true)
178            .open(&self.command_path)
179            .await?;
180        file.write_all(b"+").await?;
181        file.write_all(command_id.as_bytes()).await?;
182        file.write_all(b"\n").await?;
183        file.flush().await?;
184        self.compact_command_log_if_needed().await?;
185        Ok(())
186    }
187
188    async fn remove_command_id(&self, command_id: &str) -> Result<(), CamelError> {
189        let _guard = self.write_lock.lock().await;
190        let mut file = OpenOptions::new()
191            .create(true)
192            .append(true)
193            .open(&self.command_path)
194            .await?;
195        file.write_all(b"-").await?;
196        file.write_all(command_id.as_bytes()).await?;
197        file.write_all(b"\n").await?;
198        file.flush().await?;
199        self.compact_command_log_if_needed().await?;
200        Ok(())
201    }
202
203    async fn load_command_ids(&self) -> Result<Vec<String>, CamelError> {
204        let _guard = self.write_lock.lock().await;
205        let data = match fs::read_to_string(&self.command_path).await {
206            Ok(content) => content,
207            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
208            Err(err) => return Err(CamelError::from(err)),
209        };
210
211        let seen = Self::parse_seen_command_ids(&data);
212
213        Ok(seen.into_iter().collect())
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use tempfile::tempdir;
221
222    #[tokio::test]
223    async fn file_runtime_event_journal_roundtrip() {
224        let dir = tempdir().unwrap();
225        let journal =
226            FileRuntimeEventJournal::new(dir.path().join("runtime-events.jsonl")).unwrap();
227
228        journal
229            .append_batch(&[RuntimeEvent::RouteRegistered {
230                route_id: "j1".to_string(),
231            }])
232            .await
233            .unwrap();
234
235        let events = journal.load_all().await.unwrap();
236        assert_eq!(events.len(), 1);
237        assert!(matches!(
238            &events[0],
239            RuntimeEvent::RouteRegistered { route_id } if route_id == "j1"
240        ));
241
242        journal.append_command_id("c1").await.unwrap();
243        journal.append_command_id("c2").await.unwrap();
244        journal.remove_command_id("c1").await.unwrap();
245        let command_ids = journal.load_command_ids().await.unwrap();
246        assert_eq!(command_ids, vec!["c2".to_string()]);
247    }
248
249    #[tokio::test]
250    async fn load_all_accepts_concatenated_json_values() {
251        let dir = tempdir().unwrap();
252        let path = dir.path().join("runtime-events.jsonl");
253        std::fs::write(
254            &path,
255            "{\"RouteRegistered\":{\"route_id\":\"r1\"}}{\"RouteStarted\":{\"route_id\":\"r1\"}}\n",
256        )
257        .unwrap();
258        let journal = FileRuntimeEventJournal::new(path).unwrap();
259
260        let events = journal.load_all().await.unwrap();
261        assert_eq!(events.len(), 2);
262        assert!(matches!(
263            &events[0],
264            RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
265        ));
266        assert!(matches!(
267            &events[1],
268            RuntimeEvent::RouteStarted { route_id } if route_id == "r1"
269        ));
270    }
271
272    #[tokio::test]
273    async fn command_id_log_compacts_to_live_ids_only() {
274        let dir = tempdir().unwrap();
275        let journal = FileRuntimeEventJournal::with_compaction_threshold(
276            dir.path().join("runtime-events.jsonl"),
277            1,
278        )
279        .unwrap();
280
281        for i in 0..20 {
282            let cmd = format!("old-{i}");
283            journal.append_command_id(&cmd).await.unwrap();
284            journal.remove_command_id(&cmd).await.unwrap();
285        }
286        journal.append_command_id("live-1").await.unwrap();
287        journal.append_command_id("live-2").await.unwrap();
288
289        let command_ids = journal.load_command_ids().await.unwrap();
290        let mut sorted = command_ids;
291        sorted.sort();
292        assert_eq!(sorted, vec!["live-1".to_string(), "live-2".to_string()]);
293
294        let compacted = fs::read_to_string(&journal.command_path).await.unwrap();
295        let compacted_lines: Vec<&str> = compacted
296            .lines()
297            .map(str::trim)
298            .filter(|line| !line.is_empty())
299            .collect();
300        assert_eq!(compacted_lines, vec!["+live-1", "+live-2"]);
301    }
302}