camel_core/lifecycle/adapters/
event_journal.rs1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::sync::OnceLock;
6
7use async_trait::async_trait;
8use tokio::fs::{self, OpenOptions};
9use tokio::io::AsyncWriteExt;
10use tokio::sync::Mutex;
11
12use camel_api::CamelError;
13
14use crate::lifecycle::domain::RuntimeEvent;
15use crate::lifecycle::ports::RuntimeEventJournalPort;
16
17static JOURNAL_LOCKS: OnceLock<std::sync::Mutex<HashMap<PathBuf, Arc<Mutex<()>>>>> =
18 OnceLock::new();
19const DEFAULT_COMMAND_COMPACTION_THRESHOLD_BYTES: u64 = 256 * 1024;
20
21fn lock_for_path(path: &Path) -> Arc<Mutex<()>> {
22 let map = JOURNAL_LOCKS.get_or_init(|| std::sync::Mutex::new(HashMap::new()));
23 let mut guard = map
24 .lock()
25 .expect("runtime event journal lock map poisoned unexpectedly");
26 guard
27 .entry(path.to_path_buf())
28 .or_insert_with(|| Arc::new(Mutex::new(())))
29 .clone()
30}
31
32#[derive(Clone)]
33pub struct FileRuntimeEventJournal {
34 path: PathBuf,
35 command_path: PathBuf,
36 write_lock: Arc<Mutex<()>>,
37 command_compaction_threshold_bytes: u64,
38}
39
40impl FileRuntimeEventJournal {
41 pub fn new(path: impl Into<PathBuf>) -> Result<Self, CamelError> {
42 Self::with_compaction_threshold(path, DEFAULT_COMMAND_COMPACTION_THRESHOLD_BYTES)
43 }
44
45 fn with_compaction_threshold(
46 path: impl Into<PathBuf>,
47 command_compaction_threshold_bytes: u64,
48 ) -> Result<Self, CamelError> {
49 let path = path.into();
50 if let Some(parent) = path.parent() {
51 std::fs::create_dir_all(parent)?;
52 }
53 std::fs::OpenOptions::new()
54 .create(true)
55 .append(true)
56 .open(&path)?;
57 let command_path = path.with_extension(
58 path.extension()
59 .map(|ext| format!("{}{}", ext.to_string_lossy(), ".cmdids"))
60 .unwrap_or_else(|| "cmdids".to_string()),
61 );
62 std::fs::OpenOptions::new()
63 .create(true)
64 .append(true)
65 .open(&command_path)?;
66 let canonical_path = std::fs::canonicalize(&path).unwrap_or(path.clone());
67
68 Ok(Self {
69 path,
70 command_path,
71 write_lock: lock_for_path(&canonical_path),
72 command_compaction_threshold_bytes,
73 })
74 }
75
76 fn parse_seen_command_ids(data: &str) -> HashSet<String> {
77 let mut seen = HashSet::new();
78 for raw_line in data.lines() {
79 let line = raw_line.trim();
80 if line.is_empty() {
81 continue;
82 }
83 if let Some(rest) = line.strip_prefix('+') {
84 seen.insert(rest.to_string());
85 continue;
86 }
87 if let Some(rest) = line.strip_prefix('-') {
88 seen.remove(rest);
89 continue;
90 }
91 seen.insert(line.to_string());
93 }
94 seen
95 }
96
97 async fn compact_command_log_if_needed(&self) -> Result<(), CamelError> {
98 let metadata = match fs::metadata(&self.command_path).await {
99 Ok(metadata) => metadata,
100 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
101 Err(err) => return Err(CamelError::from(err)),
102 };
103
104 if metadata.len() < self.command_compaction_threshold_bytes {
105 return Ok(());
106 }
107
108 let data = match fs::read_to_string(&self.command_path).await {
109 Ok(content) => content,
110 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
111 Err(err) => return Err(CamelError::from(err)),
112 };
113
114 let seen = Self::parse_seen_command_ids(&data);
115 let mut ids: Vec<String> = seen.into_iter().collect();
116 ids.sort();
117
118 let mut compacted = String::new();
119 for id in ids {
120 compacted.push('+');
121 compacted.push_str(&id);
122 compacted.push('\n');
123 }
124 fs::write(&self.command_path, compacted).await?;
125 Ok(())
126 }
127}
128
129#[async_trait]
130impl RuntimeEventJournalPort for FileRuntimeEventJournal {
131 async fn append_batch(&self, events: &[RuntimeEvent]) -> Result<(), CamelError> {
132 if events.is_empty() {
133 return Ok(());
134 }
135
136 let _guard = self.write_lock.lock().await;
137 let mut file = OpenOptions::new()
138 .create(true)
139 .append(true)
140 .open(&self.path)
141 .await?;
142
143 for event in events {
144 let encoded = serde_json::to_string(event).map_err(|err| {
145 CamelError::RouteError(format!("event journal encode failed: {err}"))
146 })?;
147 file.write_all(encoded.as_bytes()).await?;
148 file.write_all(b"\n").await?;
149 }
150 file.flush().await?;
151 Ok(())
152 }
153
154 async fn load_all(&self) -> Result<Vec<RuntimeEvent>, CamelError> {
155 let _guard = self.write_lock.lock().await;
156 let data = match fs::read_to_string(&self.path).await {
157 Ok(content) => content,
158 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
159 Err(err) => return Err(CamelError::from(err)),
160 };
161
162 let mut events = Vec::new();
163 let stream = serde_json::Deserializer::from_str(&data).into_iter::<RuntimeEvent>();
164 for event in stream {
165 events.push(event.map_err(|err| {
166 CamelError::RouteError(format!("event journal decode failed: {err}"))
167 })?);
168 }
169
170 Ok(events)
171 }
172
173 async fn append_command_id(&self, command_id: &str) -> Result<(), CamelError> {
174 let _guard = self.write_lock.lock().await;
175 let mut file = OpenOptions::new()
176 .create(true)
177 .append(true)
178 .open(&self.command_path)
179 .await?;
180 file.write_all(b"+").await?;
181 file.write_all(command_id.as_bytes()).await?;
182 file.write_all(b"\n").await?;
183 file.flush().await?;
184 self.compact_command_log_if_needed().await?;
185 Ok(())
186 }
187
188 async fn remove_command_id(&self, command_id: &str) -> Result<(), CamelError> {
189 let _guard = self.write_lock.lock().await;
190 let mut file = OpenOptions::new()
191 .create(true)
192 .append(true)
193 .open(&self.command_path)
194 .await?;
195 file.write_all(b"-").await?;
196 file.write_all(command_id.as_bytes()).await?;
197 file.write_all(b"\n").await?;
198 file.flush().await?;
199 self.compact_command_log_if_needed().await?;
200 Ok(())
201 }
202
203 async fn load_command_ids(&self) -> Result<Vec<String>, CamelError> {
204 let _guard = self.write_lock.lock().await;
205 let data = match fs::read_to_string(&self.command_path).await {
206 Ok(content) => content,
207 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
208 Err(err) => return Err(CamelError::from(err)),
209 };
210
211 let seen = Self::parse_seen_command_ids(&data);
212
213 Ok(seen.into_iter().collect())
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use tempfile::tempdir;
221
222 #[tokio::test]
223 async fn file_runtime_event_journal_roundtrip() {
224 let dir = tempdir().unwrap();
225 let journal =
226 FileRuntimeEventJournal::new(dir.path().join("runtime-events.jsonl")).unwrap();
227
228 journal
229 .append_batch(&[RuntimeEvent::RouteRegistered {
230 route_id: "j1".to_string(),
231 }])
232 .await
233 .unwrap();
234
235 let events = journal.load_all().await.unwrap();
236 assert_eq!(events.len(), 1);
237 assert!(matches!(
238 &events[0],
239 RuntimeEvent::RouteRegistered { route_id } if route_id == "j1"
240 ));
241
242 journal.append_command_id("c1").await.unwrap();
243 journal.append_command_id("c2").await.unwrap();
244 journal.remove_command_id("c1").await.unwrap();
245 let command_ids = journal.load_command_ids().await.unwrap();
246 assert_eq!(command_ids, vec!["c2".to_string()]);
247 }
248
249 #[tokio::test]
250 async fn load_all_accepts_concatenated_json_values() {
251 let dir = tempdir().unwrap();
252 let path = dir.path().join("runtime-events.jsonl");
253 std::fs::write(
254 &path,
255 "{\"RouteRegistered\":{\"route_id\":\"r1\"}}{\"RouteStarted\":{\"route_id\":\"r1\"}}\n",
256 )
257 .unwrap();
258 let journal = FileRuntimeEventJournal::new(path).unwrap();
259
260 let events = journal.load_all().await.unwrap();
261 assert_eq!(events.len(), 2);
262 assert!(matches!(
263 &events[0],
264 RuntimeEvent::RouteRegistered { route_id } if route_id == "r1"
265 ));
266 assert!(matches!(
267 &events[1],
268 RuntimeEvent::RouteStarted { route_id } if route_id == "r1"
269 ));
270 }
271
272 #[tokio::test]
273 async fn command_id_log_compacts_to_live_ids_only() {
274 let dir = tempdir().unwrap();
275 let journal = FileRuntimeEventJournal::with_compaction_threshold(
276 dir.path().join("runtime-events.jsonl"),
277 1,
278 )
279 .unwrap();
280
281 for i in 0..20 {
282 let cmd = format!("old-{i}");
283 journal.append_command_id(&cmd).await.unwrap();
284 journal.remove_command_id(&cmd).await.unwrap();
285 }
286 journal.append_command_id("live-1").await.unwrap();
287 journal.append_command_id("live-2").await.unwrap();
288
289 let command_ids = journal.load_command_ids().await.unwrap();
290 let mut sorted = command_ids;
291 sorted.sort();
292 assert_eq!(sorted, vec!["live-1".to_string(), "live-2".to_string()]);
293
294 let compacted = fs::read_to_string(&journal.command_path).await.unwrap();
295 let compacted_lines: Vec<&str> = compacted
296 .lines()
297 .map(str::trim)
298 .filter(|line| !line.is_empty())
299 .collect();
300 assert_eq!(compacted_lines, vec!["+live-1", "+live-2"]);
301 }
302}