1use crate::event::{Event, EventEmitter};
8use async_trait::async_trait;
9use chrono::{DateTime, NaiveDate, Utc};
10use serde::{Deserialize, Serialize};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::AsyncWriteExt;
16use tokio::sync::RwLock;
17
18#[async_trait]
20pub trait EventPersister: EventEmitter {
21 async fn query_events(
23 &self,
24 from: DateTime<Utc>,
25 to: DateTime<Utc>,
26 event_type_filter: Option<&str>,
27 ) -> crate::Result<Vec<Event>>;
28
29 async fn count_events(&self, event_type: Option<&str>) -> crate::Result<u64>;
31
32 async fn cleanup(&self, older_than: chrono::Duration) -> crate::Result<u64>;
34
35 async fn flush(&self) -> crate::Result<()>;
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum RotationPolicy {
42 Daily,
44 SizeBased {
46 max_bytes: u64,
48 },
49 Both {
51 max_bytes: u64,
53 },
54}
55
56#[derive(Debug, Clone)]
58pub struct FileEventPersisterConfig {
59 pub directory: PathBuf,
61 pub max_file_size_bytes: u64,
63 pub rotation: RotationPolicy,
65 pub retention_days: u32,
67 pub flush_interval: Duration,
69 pub enabled: bool,
71}
72
73impl Default for FileEventPersisterConfig {
74 fn default() -> Self {
75 Self {
76 directory: PathBuf::from("./events"),
77 max_file_size_bytes: 104_857_600, rotation: RotationPolicy::Both {
79 max_bytes: 104_857_600,
80 },
81 retention_days: 30,
82 flush_interval: Duration::from_secs(1),
83 enabled: true,
84 }
85 }
86}
87
88impl FileEventPersisterConfig {
89 #[must_use]
91 pub fn with_directory(mut self, directory: impl Into<PathBuf>) -> Self {
92 self.directory = directory.into();
93 self
94 }
95
96 #[must_use]
98 pub fn with_max_file_size(mut self, max_bytes: u64) -> Self {
99 self.max_file_size_bytes = max_bytes;
100 self
101 }
102
103 #[must_use]
105 pub fn with_rotation(mut self, rotation: RotationPolicy) -> Self {
106 self.rotation = rotation;
107 self
108 }
109
110 #[must_use]
112 pub fn with_retention_days(mut self, days: u32) -> Self {
113 self.retention_days = days;
114 self
115 }
116
117 #[must_use]
119 pub fn with_flush_interval(mut self, interval: Duration) -> Self {
120 self.flush_interval = interval;
121 self
122 }
123
124 #[must_use]
126 pub fn with_enabled(mut self, enabled: bool) -> Self {
127 self.enabled = enabled;
128 self
129 }
130}
131
132pub struct FileEventPersister {
138 config: FileEventPersisterConfig,
139 current_file: Arc<RwLock<Option<tokio::io::BufWriter<tokio::fs::File>>>>,
140 current_file_size: Arc<AtomicU64>,
141 current_file_date: Arc<RwLock<NaiveDate>>,
142 events_written: Arc<AtomicU64>,
143}
144
145impl FileEventPersister {
146 pub async fn new(config: FileEventPersisterConfig) -> crate::Result<Self> {
151 tokio::fs::create_dir_all(&config.directory).await?;
152
153 let today = Utc::now().date_naive();
154 let persister = Self {
155 config,
156 current_file: Arc::new(RwLock::new(None)),
157 current_file_size: Arc::new(AtomicU64::new(0)),
158 current_file_date: Arc::new(RwLock::new(today)),
159 events_written: Arc::new(AtomicU64::new(0)),
160 };
161
162 persister.ensure_file().await?;
163 Ok(persister)
164 }
165
166 #[must_use]
168 pub fn events_written(&self) -> u64 {
169 self.events_written.load(Ordering::Relaxed)
170 }
171
172 async fn ensure_file(&self) -> crate::Result<()> {
174 let today = Utc::now().date_naive();
175 let mut file_guard = self.current_file.write().await;
176 let mut date_guard = self.current_file_date.write().await;
177
178 let needs_new_file = file_guard.is_none() || *date_guard != today;
179
180 if needs_new_file {
181 if let Some(ref mut writer) = *file_guard {
183 writer.flush().await?;
184 }
185 *file_guard = None;
186
187 *date_guard = today;
188 self.current_file_size.store(0, Ordering::Relaxed);
189
190 let path = self.file_path_for_date(today);
191 let file = tokio::fs::OpenOptions::new()
192 .create(true)
193 .append(true)
194 .open(&path)
195 .await?;
196
197 let metadata = file.metadata().await?;
198 self.current_file_size
199 .store(metadata.len(), Ordering::Relaxed);
200
201 *file_guard = Some(tokio::io::BufWriter::new(file));
202 }
203
204 Ok(())
205 }
206
207 async fn rotate_if_needed(&self) -> crate::Result<()> {
209 let needs_rotation = match &self.config.rotation {
210 RotationPolicy::Daily => {
211 let today = Utc::now().date_naive();
212 let date_guard = self.current_file_date.read().await;
213 *date_guard != today
214 }
215 RotationPolicy::SizeBased { max_bytes } => {
216 self.current_file_size.load(Ordering::Relaxed) >= *max_bytes
217 }
218 RotationPolicy::Both { max_bytes } => {
219 let today = Utc::now().date_naive();
220 let date_guard = self.current_file_date.read().await;
221 *date_guard != today || self.current_file_size.load(Ordering::Relaxed) >= *max_bytes
222 }
223 };
224
225 if needs_rotation {
226 self.perform_rotation().await?;
227 }
228
229 Ok(())
230 }
231
232 async fn perform_rotation(&self) -> crate::Result<()> {
234 let mut file_guard = self.current_file.write().await;
235
236 if let Some(ref mut writer) = *file_guard {
238 writer.flush().await?;
239 }
240 *file_guard = None;
241
242 let today = Utc::now().date_naive();
243 let mut date_guard = self.current_file_date.write().await;
244 *date_guard = today;
245
246 let mut index = 0u32;
248 loop {
249 let path = if index == 0 {
250 self.file_path_for_date(today)
251 } else {
252 self.file_path_with_index(today, index)
253 };
254
255 if !path.exists() {
256 let file = tokio::fs::OpenOptions::new()
257 .create(true)
258 .append(true)
259 .open(&path)
260 .await?;
261 self.current_file_size.store(0, Ordering::Relaxed);
262 *file_guard = Some(tokio::io::BufWriter::new(file));
263 return Ok(());
264 }
265
266 let metadata = tokio::fs::metadata(&path).await?;
268 if metadata.len() < self.config.max_file_size_bytes {
269 let file = tokio::fs::OpenOptions::new()
270 .create(true)
271 .append(true)
272 .open(&path)
273 .await?;
274 self.current_file_size
275 .store(metadata.len(), Ordering::Relaxed);
276 *file_guard = Some(tokio::io::BufWriter::new(file));
277 return Ok(());
278 }
279
280 index = index.saturating_add(1);
281 if index > 10_000 {
282 return Err(crate::CelersError::Other(
283 "Too many rotation files for a single day".to_string(),
284 ));
285 }
286 }
287 }
288
289 fn file_path_for_date(&self, date: NaiveDate) -> PathBuf {
291 self.config
292 .directory
293 .join(format!("events-{}.jsonl", date.format("%Y-%m-%d")))
294 }
295
296 fn file_path_with_index(&self, date: NaiveDate, index: u32) -> PathBuf {
298 self.config.directory.join(format!(
299 "events-{}.{}.jsonl",
300 date.format("%Y-%m-%d"),
301 index
302 ))
303 }
304
305 async fn list_event_files(&self) -> crate::Result<Vec<PathBuf>> {
307 let mut files = Vec::new();
308 let mut entries = tokio::fs::read_dir(&self.config.directory).await?;
309
310 while let Some(entry) = entries.next_entry().await? {
311 let path = entry.path();
312 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
313 if name.starts_with("events-") && name.ends_with(".jsonl") {
314 files.push(path);
315 }
316 }
317 }
318
319 files.sort();
320 Ok(files)
321 }
322
323 fn parse_file_date(path: &Path) -> Option<NaiveDate> {
325 let name = path.file_name()?.to_str()?;
326 let date_str = name.strip_prefix("events-")?;
328 let date_part = if let Some(pos) = date_str.find('.') {
329 &date_str[..pos]
330 } else {
331 date_str
332 };
333 NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok()
334 }
335
336 async fn read_events_from_file(path: &Path) -> crate::Result<Vec<Event>> {
338 let content = tokio::fs::read_to_string(path).await?;
339 let mut events = Vec::new();
340
341 for line in content.lines() {
342 let trimmed = line.trim();
343 if trimmed.is_empty() {
344 continue;
345 }
346 match serde_json::from_str::<Event>(trimmed) {
347 Ok(event) => events.push(event),
348 Err(e) => {
349 tracing::warn!("Failed to parse event line in {:?}: {}", path, e);
350 }
351 }
352 }
353
354 Ok(events)
355 }
356}
357
358#[async_trait]
359impl EventEmitter for FileEventPersister {
360 async fn emit(&self, event: Event) -> crate::Result<()> {
361 if !self.config.enabled {
362 return Ok(());
363 }
364
365 self.ensure_file().await?;
366
367 let json = serde_json::to_string(&event).map_err(|e| {
368 crate::CelersError::Serialization(format!("Failed to serialize event: {}", e))
369 })?;
370
371 let line = format!("{}\n", json);
372 let line_bytes = line.as_bytes();
373
374 {
375 let mut file_guard = self.current_file.write().await;
376 if let Some(ref mut writer) = *file_guard {
377 writer.write_all(line_bytes).await?;
378 self.current_file_size
379 .fetch_add(line_bytes.len() as u64, Ordering::Relaxed);
380 self.events_written.fetch_add(1, Ordering::Relaxed);
381 }
382 }
383
384 self.rotate_if_needed().await?;
385
386 Ok(())
387 }
388
389 async fn emit_batch(&self, events: Vec<Event>) -> crate::Result<()> {
390 if !self.config.enabled {
391 return Ok(());
392 }
393
394 self.ensure_file().await?;
395
396 let mut total_bytes = 0u64;
397 let mut lines = String::new();
398
399 for event in &events {
400 let json = serde_json::to_string(event).map_err(|e| {
401 crate::CelersError::Serialization(format!("Failed to serialize event: {}", e))
402 })?;
403 lines.push_str(&json);
404 lines.push('\n');
405 }
406
407 let line_bytes = lines.as_bytes();
408 total_bytes += line_bytes.len() as u64;
409
410 {
411 let mut file_guard = self.current_file.write().await;
412 if let Some(ref mut writer) = *file_guard {
413 writer.write_all(line_bytes).await?;
414 writer.flush().await?;
415 }
416 }
417
418 self.current_file_size
419 .fetch_add(total_bytes, Ordering::Relaxed);
420 self.events_written
421 .fetch_add(events.len() as u64, Ordering::Relaxed);
422
423 self.rotate_if_needed().await?;
424
425 Ok(())
426 }
427
428 fn is_enabled(&self) -> bool {
429 self.config.enabled
430 }
431}
432
433#[async_trait]
434impl EventPersister for FileEventPersister {
435 async fn query_events(
436 &self,
437 from: DateTime<Utc>,
438 to: DateTime<Utc>,
439 event_type_filter: Option<&str>,
440 ) -> crate::Result<Vec<Event>> {
441 self.flush().await?;
443
444 let from_date = from.date_naive();
445 let to_date = to.date_naive();
446
447 let files = self.list_event_files().await?;
448 let mut result = Vec::new();
449
450 for file_path in &files {
451 if let Some(file_date) = Self::parse_file_date(file_path) {
453 if file_date < from_date || file_date > to_date {
454 continue;
455 }
456 }
457
458 let events = Self::read_events_from_file(file_path).await?;
459
460 for event in events {
461 let ts = event.timestamp();
462 if ts >= from && ts <= to {
463 if let Some(filter) = event_type_filter {
464 if event.event_type() == filter {
465 result.push(event);
466 }
467 } else {
468 result.push(event);
469 }
470 }
471 }
472 }
473
474 Ok(result)
475 }
476
477 async fn count_events(&self, event_type: Option<&str>) -> crate::Result<u64> {
478 self.flush().await?;
479
480 let files = self.list_event_files().await?;
481 let mut count = 0u64;
482
483 for file_path in &files {
484 let events = Self::read_events_from_file(file_path).await?;
485 for event in &events {
486 if let Some(filter) = event_type {
487 if event.event_type() == filter {
488 count += 1;
489 }
490 } else {
491 count += 1;
492 }
493 }
494 }
495
496 Ok(count)
497 }
498
499 async fn cleanup(&self, older_than: chrono::Duration) -> crate::Result<u64> {
500 let cutoff = Utc::now()
501 .date_naive()
502 .checked_sub_signed(older_than)
503 .ok_or_else(|| {
504 crate::CelersError::Other("Invalid duration for cleanup cutoff".to_string())
505 })?;
506
507 let files = self.list_event_files().await?;
508 let mut removed = 0u64;
509
510 for file_path in &files {
511 if let Some(file_date) = Self::parse_file_date(file_path) {
512 if file_date < cutoff {
513 tokio::fs::remove_file(file_path).await?;
514 removed += 1;
515 }
516 }
517 }
518
519 Ok(removed)
520 }
521
522 async fn flush(&self) -> crate::Result<()> {
523 let mut file_guard = self.current_file.write().await;
524 if let Some(ref mut writer) = *file_guard {
525 writer.flush().await?;
526 }
527 Ok(())
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use crate::event::{TaskEvent, WorkerEvent};
535 use uuid::Uuid;
536
537 fn make_task_event(name: &str) -> Event {
538 Event::Task(TaskEvent::Started {
539 task_id: Uuid::new_v4(),
540 task_name: name.to_string(),
541 hostname: "test-worker".to_string(),
542 timestamp: Utc::now(),
543 pid: 1234,
544 })
545 }
546
547 fn make_worker_event() -> Event {
548 Event::Worker(WorkerEvent::Online {
549 hostname: "test-worker".to_string(),
550 timestamp: Utc::now(),
551 sw_ident: "celers".to_string(),
552 sw_ver: "0.2.0".to_string(),
553 sw_sys: "linux".to_string(),
554 })
555 }
556
557 #[test]
558 fn test_file_persister_config_defaults() {
559 let config = FileEventPersisterConfig::default();
560 assert_eq!(config.max_file_size_bytes, 104_857_600);
561 assert_eq!(config.retention_days, 30);
562 assert_eq!(config.flush_interval, Duration::from_secs(1));
563 assert!(config.enabled);
564 assert_eq!(config.directory, PathBuf::from("./events"));
565 }
566
567 #[tokio::test]
568 async fn test_file_persister_write_and_read() {
569 let dir = std::env::temp_dir().join(format!("celers_test_wr_{}", Uuid::new_v4()));
570 let config = FileEventPersisterConfig::default()
571 .with_directory(&dir)
572 .with_enabled(true);
573
574 let persister = FileEventPersister::new(config)
575 .await
576 .expect("Failed to create persister");
577
578 let event = make_task_event("test_task");
579 persister.emit(event.clone()).await.expect("emit failed");
580 persister.flush().await.expect("flush failed");
581
582 let from = Utc::now() - chrono::Duration::hours(1);
583 let to = Utc::now() + chrono::Duration::hours(1);
584 let events = persister
585 .query_events(from, to, None)
586 .await
587 .expect("query failed");
588
589 assert_eq!(events.len(), 1);
590 assert_eq!(events[0].event_type(), "task-started");
591
592 let _ = tokio::fs::remove_dir_all(&dir).await;
594 }
595
596 #[tokio::test]
597 async fn test_file_persister_rotation_by_size() {
598 let dir = std::env::temp_dir().join(format!("celers_test_rot_{}", Uuid::new_v4()));
599 let config = FileEventPersisterConfig::default()
600 .with_directory(&dir)
601 .with_max_file_size(100) .with_rotation(RotationPolicy::SizeBased { max_bytes: 100 });
603
604 let persister = FileEventPersister::new(config)
605 .await
606 .expect("Failed to create persister");
607
608 for i in 0..20 {
610 let event = make_task_event(&format!("task_{}", i));
611 persister.emit(event).await.expect("emit failed");
612 }
613 persister.flush().await.expect("flush failed");
614
615 let files = persister
617 .list_event_files()
618 .await
619 .expect("list files failed");
620 assert!(
621 files.len() > 1,
622 "Expected multiple files after rotation, got {}",
623 files.len()
624 );
625
626 let _ = tokio::fs::remove_dir_all(&dir).await;
627 }
628
629 #[tokio::test]
630 async fn test_file_persister_cleanup() {
631 let dir = std::env::temp_dir().join(format!("celers_test_cl_{}", Uuid::new_v4()));
632 tokio::fs::create_dir_all(&dir)
633 .await
634 .expect("create dir failed");
635
636 let old_date = NaiveDate::from_ymd_opt(2020, 1, 1).expect("invalid date");
638 let old_file = dir.join(format!("events-{}.jsonl", old_date.format("%Y-%m-%d")));
639 tokio::fs::write(&old_file, "{}\n")
640 .await
641 .expect("write old file failed");
642
643 let config = FileEventPersisterConfig::default()
644 .with_directory(&dir)
645 .with_enabled(true);
646
647 let persister = FileEventPersister::new(config)
648 .await
649 .expect("Failed to create persister");
650
651 let removed = persister
653 .cleanup(chrono::Duration::days(1))
654 .await
655 .expect("cleanup failed");
656
657 assert!(removed >= 1, "Expected at least 1 file removed");
658
659 assert!(!old_file.exists());
661
662 let _ = tokio::fs::remove_dir_all(&dir).await;
663 }
664
665 #[tokio::test]
666 async fn test_file_persister_query_with_filter() {
667 let dir = std::env::temp_dir().join(format!("celers_test_filt_{}", Uuid::new_v4()));
668 let config = FileEventPersisterConfig::default()
669 .with_directory(&dir)
670 .with_enabled(true);
671
672 let persister = FileEventPersister::new(config)
673 .await
674 .expect("Failed to create persister");
675
676 persister
678 .emit(make_task_event("my_task"))
679 .await
680 .expect("emit task failed");
681 persister
682 .emit(make_worker_event())
683 .await
684 .expect("emit worker failed");
685 persister
686 .emit(make_task_event("another_task"))
687 .await
688 .expect("emit task2 failed");
689 persister.flush().await.expect("flush failed");
690
691 let from = Utc::now() - chrono::Duration::hours(1);
692 let to = Utc::now() + chrono::Duration::hours(1);
693
694 let task_events = persister
696 .query_events(from, to, Some("task-started"))
697 .await
698 .expect("query failed");
699 assert_eq!(task_events.len(), 2);
700
701 let worker_events = persister
703 .query_events(from, to, Some("worker-online"))
704 .await
705 .expect("query failed");
706 assert_eq!(worker_events.len(), 1);
707
708 let _ = tokio::fs::remove_dir_all(&dir).await;
709 }
710
711 #[tokio::test]
712 async fn test_file_persister_count() {
713 let dir = std::env::temp_dir().join(format!("celers_test_cnt_{}", Uuid::new_v4()));
714 let config = FileEventPersisterConfig::default()
715 .with_directory(&dir)
716 .with_enabled(true);
717
718 let persister = FileEventPersister::new(config)
719 .await
720 .expect("Failed to create persister");
721
722 persister
723 .emit(make_task_event("t1"))
724 .await
725 .expect("emit failed");
726 persister
727 .emit(make_task_event("t2"))
728 .await
729 .expect("emit failed");
730 persister
731 .emit(make_worker_event())
732 .await
733 .expect("emit failed");
734 persister.flush().await.expect("flush failed");
735
736 let total = persister
737 .count_events(None)
738 .await
739 .expect("count all failed");
740 assert_eq!(total, 3);
741
742 let task_count = persister
743 .count_events(Some("task-started"))
744 .await
745 .expect("count task failed");
746 assert_eq!(task_count, 2);
747
748 let worker_count = persister
749 .count_events(Some("worker-online"))
750 .await
751 .expect("count worker failed");
752 assert_eq!(worker_count, 1);
753
754 let _ = tokio::fs::remove_dir_all(&dir).await;
755 }
756
757 #[tokio::test]
758 async fn test_file_persister_batch() {
759 let dir = std::env::temp_dir().join(format!("celers_test_batch_{}", Uuid::new_v4()));
760 let config = FileEventPersisterConfig::default()
761 .with_directory(&dir)
762 .with_enabled(true);
763
764 let persister = FileEventPersister::new(config)
765 .await
766 .expect("Failed to create persister");
767
768 let events = vec![
769 make_task_event("batch_1"),
770 make_task_event("batch_2"),
771 make_worker_event(),
772 ];
773
774 persister
775 .emit_batch(events)
776 .await
777 .expect("emit_batch failed");
778
779 let total = persister.count_events(None).await.expect("count failed");
780 assert_eq!(total, 3);
781 assert_eq!(persister.events_written(), 3);
782
783 let _ = tokio::fs::remove_dir_all(&dir).await;
784 }
785
786 #[tokio::test]
787 async fn test_file_persister_disabled() {
788 let dir = std::env::temp_dir().join(format!("celers_test_dis_{}", Uuid::new_v4()));
789 let config = FileEventPersisterConfig::default()
790 .with_directory(&dir)
791 .with_enabled(false);
792
793 let persister = FileEventPersister::new(config)
794 .await
795 .expect("Failed to create persister");
796
797 assert!(!persister.is_enabled());
798
799 persister
801 .emit(make_task_event("ignored"))
802 .await
803 .expect("emit failed");
804 assert_eq!(persister.events_written(), 0);
805
806 let _ = tokio::fs::remove_dir_all(&dir).await;
807 }
808}