Skip to main content

celers_core/
event_persistence.rs

1//! Event persistence for audit trails and replay
2//!
3//! Provides persistent event storage implementations:
4//! - `FileEventPersister`: JSONL file-based storage with rotation
5//! - `EventPersister` trait: Query and cleanup interface
6
7use crate::event::{Event, EventEmitter};
8use async_trait::async_trait;
9use chrono::{DateTime, NaiveDate, Utc};
10use serde::{Deserialize, Serialize};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::AsyncWriteExt;
16use tokio::sync::RwLock;
17
18/// Trait for persistent event storage with query and cleanup capabilities
19#[async_trait]
20pub trait EventPersister: EventEmitter {
21    /// Query events within a time range, optionally filtering by event type
22    async fn query_events(
23        &self,
24        from: DateTime<Utc>,
25        to: DateTime<Utc>,
26        event_type_filter: Option<&str>,
27    ) -> crate::Result<Vec<Event>>;
28
29    /// Count events, optionally filtering by event type
30    async fn count_events(&self, event_type: Option<&str>) -> crate::Result<u64>;
31
32    /// Remove events older than the given duration, returning the count of removed files
33    async fn cleanup(&self, older_than: chrono::Duration) -> crate::Result<u64>;
34
35    /// Flush any buffered data to persistent storage
36    async fn flush(&self) -> crate::Result<()>;
37}
38
39/// Policy for rotating event log files
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub enum RotationPolicy {
42    /// Rotate daily (new file each calendar day)
43    Daily,
44    /// Rotate when file exceeds max_bytes
45    SizeBased {
46        /// Maximum file size in bytes before rotation
47        max_bytes: u64,
48    },
49    /// Rotate on either daily boundary or size threshold
50    Both {
51        /// Maximum file size in bytes before rotation
52        max_bytes: u64,
53    },
54}
55
56/// Configuration for [`FileEventPersister`]
57#[derive(Debug, Clone)]
58pub struct FileEventPersisterConfig {
59    /// Directory where event JSONL files are stored
60    pub directory: PathBuf,
61    /// Maximum file size in bytes (default: 100MB)
62    pub max_file_size_bytes: u64,
63    /// File rotation policy
64    pub rotation: RotationPolicy,
65    /// Number of days to retain event files (default: 30)
66    pub retention_days: u32,
67    /// How often to flush buffered writes (default: 1s)
68    pub flush_interval: Duration,
69    /// Whether event persistence is enabled
70    pub enabled: bool,
71}
72
73impl Default for FileEventPersisterConfig {
74    fn default() -> Self {
75        Self {
76            directory: PathBuf::from("./events"),
77            max_file_size_bytes: 104_857_600, // 100MB
78            rotation: RotationPolicy::Both {
79                max_bytes: 104_857_600,
80            },
81            retention_days: 30,
82            flush_interval: Duration::from_secs(1),
83            enabled: true,
84        }
85    }
86}
87
88impl FileEventPersisterConfig {
89    /// Set the directory for event files
90    #[must_use]
91    pub fn with_directory(mut self, directory: impl Into<PathBuf>) -> Self {
92        self.directory = directory.into();
93        self
94    }
95
96    /// Set the maximum file size in bytes
97    #[must_use]
98    pub fn with_max_file_size(mut self, max_bytes: u64) -> Self {
99        self.max_file_size_bytes = max_bytes;
100        self
101    }
102
103    /// Set the rotation policy
104    #[must_use]
105    pub fn with_rotation(mut self, rotation: RotationPolicy) -> Self {
106        self.rotation = rotation;
107        self
108    }
109
110    /// Set the retention period in days
111    #[must_use]
112    pub fn with_retention_days(mut self, days: u32) -> Self {
113        self.retention_days = days;
114        self
115    }
116
117    /// Set the flush interval
118    #[must_use]
119    pub fn with_flush_interval(mut self, interval: Duration) -> Self {
120        self.flush_interval = interval;
121        self
122    }
123
124    /// Set whether persistence is enabled
125    #[must_use]
126    pub fn with_enabled(mut self, enabled: bool) -> Self {
127        self.enabled = enabled;
128        self
129    }
130}
131
132/// File-based event persister using JSONL format with rotation
133///
134/// Events are written to date-partitioned JSONL files in the configured directory.
135/// Files are named `events-YYYY-MM-DD.jsonl` with optional index suffix for
136/// size-based rotation: `events-YYYY-MM-DD.N.jsonl`.
137pub struct FileEventPersister {
138    config: FileEventPersisterConfig,
139    current_file: Arc<RwLock<Option<tokio::io::BufWriter<tokio::fs::File>>>>,
140    current_file_size: Arc<AtomicU64>,
141    current_file_date: Arc<RwLock<NaiveDate>>,
142    events_written: Arc<AtomicU64>,
143}
144
145impl FileEventPersister {
146    /// Create a new file-based event persister
147    ///
148    /// Creates the configured directory if it does not exist and opens
149    /// the initial event file for writing.
150    pub async fn new(config: FileEventPersisterConfig) -> crate::Result<Self> {
151        tokio::fs::create_dir_all(&config.directory).await?;
152
153        let today = Utc::now().date_naive();
154        let persister = Self {
155            config,
156            current_file: Arc::new(RwLock::new(None)),
157            current_file_size: Arc::new(AtomicU64::new(0)),
158            current_file_date: Arc::new(RwLock::new(today)),
159            events_written: Arc::new(AtomicU64::new(0)),
160        };
161
162        persister.ensure_file().await?;
163        Ok(persister)
164    }
165
166    /// Get the total number of events written since creation
167    #[must_use]
168    pub fn events_written(&self) -> u64 {
169        self.events_written.load(Ordering::Relaxed)
170    }
171
172    /// Ensure a file is open and ready for writing, rotating if necessary
173    async fn ensure_file(&self) -> crate::Result<()> {
174        let today = Utc::now().date_naive();
175        let mut file_guard = self.current_file.write().await;
176        let mut date_guard = self.current_file_date.write().await;
177
178        let needs_new_file = file_guard.is_none() || *date_guard != today;
179
180        if needs_new_file {
181            // Flush and drop old file
182            if let Some(ref mut writer) = *file_guard {
183                writer.flush().await?;
184            }
185            *file_guard = None;
186
187            *date_guard = today;
188            self.current_file_size.store(0, Ordering::Relaxed);
189
190            let path = self.file_path_for_date(today);
191            let file = tokio::fs::OpenOptions::new()
192                .create(true)
193                .append(true)
194                .open(&path)
195                .await?;
196
197            let metadata = file.metadata().await?;
198            self.current_file_size
199                .store(metadata.len(), Ordering::Relaxed);
200
201            *file_guard = Some(tokio::io::BufWriter::new(file));
202        }
203
204        Ok(())
205    }
206
207    /// Check if rotation is needed based on the configured policy and rotate if so
208    async fn rotate_if_needed(&self) -> crate::Result<()> {
209        let needs_rotation = match &self.config.rotation {
210            RotationPolicy::Daily => {
211                let today = Utc::now().date_naive();
212                let date_guard = self.current_file_date.read().await;
213                *date_guard != today
214            }
215            RotationPolicy::SizeBased { max_bytes } => {
216                self.current_file_size.load(Ordering::Relaxed) >= *max_bytes
217            }
218            RotationPolicy::Both { max_bytes } => {
219                let today = Utc::now().date_naive();
220                let date_guard = self.current_file_date.read().await;
221                *date_guard != today || self.current_file_size.load(Ordering::Relaxed) >= *max_bytes
222            }
223        };
224
225        if needs_rotation {
226            self.perform_rotation().await?;
227        }
228
229        Ok(())
230    }
231
232    /// Perform the actual file rotation
233    async fn perform_rotation(&self) -> crate::Result<()> {
234        let mut file_guard = self.current_file.write().await;
235
236        // Flush and close current file
237        if let Some(ref mut writer) = *file_guard {
238            writer.flush().await?;
239        }
240        *file_guard = None;
241
242        let today = Utc::now().date_naive();
243        let mut date_guard = self.current_file_date.write().await;
244        *date_guard = today;
245
246        // Find the next available index for today
247        let mut index = 0u32;
248        loop {
249            let path = if index == 0 {
250                self.file_path_for_date(today)
251            } else {
252                self.file_path_with_index(today, index)
253            };
254
255            if !path.exists() {
256                let file = tokio::fs::OpenOptions::new()
257                    .create(true)
258                    .append(true)
259                    .open(&path)
260                    .await?;
261                self.current_file_size.store(0, Ordering::Relaxed);
262                *file_guard = Some(tokio::io::BufWriter::new(file));
263                return Ok(());
264            }
265
266            // Check if existing file is under size limit
267            let metadata = tokio::fs::metadata(&path).await?;
268            if metadata.len() < self.config.max_file_size_bytes {
269                let file = tokio::fs::OpenOptions::new()
270                    .create(true)
271                    .append(true)
272                    .open(&path)
273                    .await?;
274                self.current_file_size
275                    .store(metadata.len(), Ordering::Relaxed);
276                *file_guard = Some(tokio::io::BufWriter::new(file));
277                return Ok(());
278            }
279
280            index = index.saturating_add(1);
281            if index > 10_000 {
282                return Err(crate::CelersError::Other(
283                    "Too many rotation files for a single day".to_string(),
284                ));
285            }
286        }
287    }
288
289    /// Get the file path for a given date (primary file)
290    fn file_path_for_date(&self, date: NaiveDate) -> PathBuf {
291        self.config
292            .directory
293            .join(format!("events-{}.jsonl", date.format("%Y-%m-%d")))
294    }
295
296    /// Get the file path for a given date with a rotation index
297    fn file_path_with_index(&self, date: NaiveDate, index: u32) -> PathBuf {
298        self.config.directory.join(format!(
299            "events-{}.{}.jsonl",
300            date.format("%Y-%m-%d"),
301            index
302        ))
303    }
304
305    /// List all event files in the directory
306    async fn list_event_files(&self) -> crate::Result<Vec<PathBuf>> {
307        let mut files = Vec::new();
308        let mut entries = tokio::fs::read_dir(&self.config.directory).await?;
309
310        while let Some(entry) = entries.next_entry().await? {
311            let path = entry.path();
312            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
313                if name.starts_with("events-") && name.ends_with(".jsonl") {
314                    files.push(path);
315                }
316            }
317        }
318
319        files.sort();
320        Ok(files)
321    }
322
323    /// Parse the date from an event file name
324    fn parse_file_date(path: &Path) -> Option<NaiveDate> {
325        let name = path.file_name()?.to_str()?;
326        // Format: events-YYYY-MM-DD.jsonl or events-YYYY-MM-DD.N.jsonl
327        let date_str = name.strip_prefix("events-")?;
328        let date_part = if let Some(pos) = date_str.find('.') {
329            &date_str[..pos]
330        } else {
331            date_str
332        };
333        NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok()
334    }
335
336    /// Read and parse events from a single JSONL file
337    async fn read_events_from_file(path: &Path) -> crate::Result<Vec<Event>> {
338        let content = tokio::fs::read_to_string(path).await?;
339        let mut events = Vec::new();
340
341        for line in content.lines() {
342            let trimmed = line.trim();
343            if trimmed.is_empty() {
344                continue;
345            }
346            match serde_json::from_str::<Event>(trimmed) {
347                Ok(event) => events.push(event),
348                Err(e) => {
349                    tracing::warn!("Failed to parse event line in {:?}: {}", path, e);
350                }
351            }
352        }
353
354        Ok(events)
355    }
356}
357
358#[async_trait]
359impl EventEmitter for FileEventPersister {
360    async fn emit(&self, event: Event) -> crate::Result<()> {
361        if !self.config.enabled {
362            return Ok(());
363        }
364
365        self.ensure_file().await?;
366
367        let json = serde_json::to_string(&event).map_err(|e| {
368            crate::CelersError::Serialization(format!("Failed to serialize event: {}", e))
369        })?;
370
371        let line = format!("{}\n", json);
372        let line_bytes = line.as_bytes();
373
374        {
375            let mut file_guard = self.current_file.write().await;
376            if let Some(ref mut writer) = *file_guard {
377                writer.write_all(line_bytes).await?;
378                self.current_file_size
379                    .fetch_add(line_bytes.len() as u64, Ordering::Relaxed);
380                self.events_written.fetch_add(1, Ordering::Relaxed);
381            }
382        }
383
384        self.rotate_if_needed().await?;
385
386        Ok(())
387    }
388
389    async fn emit_batch(&self, events: Vec<Event>) -> crate::Result<()> {
390        if !self.config.enabled {
391            return Ok(());
392        }
393
394        self.ensure_file().await?;
395
396        let mut total_bytes = 0u64;
397        let mut lines = String::new();
398
399        for event in &events {
400            let json = serde_json::to_string(event).map_err(|e| {
401                crate::CelersError::Serialization(format!("Failed to serialize event: {}", e))
402            })?;
403            lines.push_str(&json);
404            lines.push('\n');
405        }
406
407        let line_bytes = lines.as_bytes();
408        total_bytes += line_bytes.len() as u64;
409
410        {
411            let mut file_guard = self.current_file.write().await;
412            if let Some(ref mut writer) = *file_guard {
413                writer.write_all(line_bytes).await?;
414                writer.flush().await?;
415            }
416        }
417
418        self.current_file_size
419            .fetch_add(total_bytes, Ordering::Relaxed);
420        self.events_written
421            .fetch_add(events.len() as u64, Ordering::Relaxed);
422
423        self.rotate_if_needed().await?;
424
425        Ok(())
426    }
427
428    fn is_enabled(&self) -> bool {
429        self.config.enabled
430    }
431}
432
433#[async_trait]
434impl EventPersister for FileEventPersister {
435    async fn query_events(
436        &self,
437        from: DateTime<Utc>,
438        to: DateTime<Utc>,
439        event_type_filter: Option<&str>,
440    ) -> crate::Result<Vec<Event>> {
441        // Flush before querying to ensure all buffered data is written
442        self.flush().await?;
443
444        let from_date = from.date_naive();
445        let to_date = to.date_naive();
446
447        let files = self.list_event_files().await?;
448        let mut result = Vec::new();
449
450        for file_path in &files {
451            // Filter by file date
452            if let Some(file_date) = Self::parse_file_date(file_path) {
453                if file_date < from_date || file_date > to_date {
454                    continue;
455                }
456            }
457
458            let events = Self::read_events_from_file(file_path).await?;
459
460            for event in events {
461                let ts = event.timestamp();
462                if ts >= from && ts <= to {
463                    if let Some(filter) = event_type_filter {
464                        if event.event_type() == filter {
465                            result.push(event);
466                        }
467                    } else {
468                        result.push(event);
469                    }
470                }
471            }
472        }
473
474        Ok(result)
475    }
476
477    async fn count_events(&self, event_type: Option<&str>) -> crate::Result<u64> {
478        self.flush().await?;
479
480        let files = self.list_event_files().await?;
481        let mut count = 0u64;
482
483        for file_path in &files {
484            let events = Self::read_events_from_file(file_path).await?;
485            for event in &events {
486                if let Some(filter) = event_type {
487                    if event.event_type() == filter {
488                        count += 1;
489                    }
490                } else {
491                    count += 1;
492                }
493            }
494        }
495
496        Ok(count)
497    }
498
499    async fn cleanup(&self, older_than: chrono::Duration) -> crate::Result<u64> {
500        let cutoff = Utc::now()
501            .date_naive()
502            .checked_sub_signed(older_than)
503            .ok_or_else(|| {
504                crate::CelersError::Other("Invalid duration for cleanup cutoff".to_string())
505            })?;
506
507        let files = self.list_event_files().await?;
508        let mut removed = 0u64;
509
510        for file_path in &files {
511            if let Some(file_date) = Self::parse_file_date(file_path) {
512                if file_date < cutoff {
513                    tokio::fs::remove_file(file_path).await?;
514                    removed += 1;
515                }
516            }
517        }
518
519        Ok(removed)
520    }
521
522    async fn flush(&self) -> crate::Result<()> {
523        let mut file_guard = self.current_file.write().await;
524        if let Some(ref mut writer) = *file_guard {
525            writer.flush().await?;
526        }
527        Ok(())
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use crate::event::{TaskEvent, WorkerEvent};
535    use uuid::Uuid;
536
537    fn make_task_event(name: &str) -> Event {
538        Event::Task(TaskEvent::Started {
539            task_id: Uuid::new_v4(),
540            task_name: name.to_string(),
541            hostname: "test-worker".to_string(),
542            timestamp: Utc::now(),
543            pid: 1234,
544        })
545    }
546
547    fn make_worker_event() -> Event {
548        Event::Worker(WorkerEvent::Online {
549            hostname: "test-worker".to_string(),
550            timestamp: Utc::now(),
551            sw_ident: "celers".to_string(),
552            sw_ver: "0.2.0".to_string(),
553            sw_sys: "linux".to_string(),
554        })
555    }
556
557    #[test]
558    fn test_file_persister_config_defaults() {
559        let config = FileEventPersisterConfig::default();
560        assert_eq!(config.max_file_size_bytes, 104_857_600);
561        assert_eq!(config.retention_days, 30);
562        assert_eq!(config.flush_interval, Duration::from_secs(1));
563        assert!(config.enabled);
564        assert_eq!(config.directory, PathBuf::from("./events"));
565    }
566
567    #[tokio::test]
568    async fn test_file_persister_write_and_read() {
569        let dir = std::env::temp_dir().join(format!("celers_test_wr_{}", Uuid::new_v4()));
570        let config = FileEventPersisterConfig::default()
571            .with_directory(&dir)
572            .with_enabled(true);
573
574        let persister = FileEventPersister::new(config)
575            .await
576            .expect("Failed to create persister");
577
578        let event = make_task_event("test_task");
579        persister.emit(event.clone()).await.expect("emit failed");
580        persister.flush().await.expect("flush failed");
581
582        let from = Utc::now() - chrono::Duration::hours(1);
583        let to = Utc::now() + chrono::Duration::hours(1);
584        let events = persister
585            .query_events(from, to, None)
586            .await
587            .expect("query failed");
588
589        assert_eq!(events.len(), 1);
590        assert_eq!(events[0].event_type(), "task-started");
591
592        // Cleanup temp directory
593        let _ = tokio::fs::remove_dir_all(&dir).await;
594    }
595
596    #[tokio::test]
597    async fn test_file_persister_rotation_by_size() {
598        let dir = std::env::temp_dir().join(format!("celers_test_rot_{}", Uuid::new_v4()));
599        let config = FileEventPersisterConfig::default()
600            .with_directory(&dir)
601            .with_max_file_size(100) // very small to trigger rotation
602            .with_rotation(RotationPolicy::SizeBased { max_bytes: 100 });
603
604        let persister = FileEventPersister::new(config)
605            .await
606            .expect("Failed to create persister");
607
608        // Emit enough events to trigger rotation
609        for i in 0..20 {
610            let event = make_task_event(&format!("task_{}", i));
611            persister.emit(event).await.expect("emit failed");
612        }
613        persister.flush().await.expect("flush failed");
614
615        // Check that multiple files were created
616        let files = persister
617            .list_event_files()
618            .await
619            .expect("list files failed");
620        assert!(
621            files.len() > 1,
622            "Expected multiple files after rotation, got {}",
623            files.len()
624        );
625
626        let _ = tokio::fs::remove_dir_all(&dir).await;
627    }
628
629    #[tokio::test]
630    async fn test_file_persister_cleanup() {
631        let dir = std::env::temp_dir().join(format!("celers_test_cl_{}", Uuid::new_v4()));
632        tokio::fs::create_dir_all(&dir)
633            .await
634            .expect("create dir failed");
635
636        // Create an old event file manually
637        let old_date = NaiveDate::from_ymd_opt(2020, 1, 1).expect("invalid date");
638        let old_file = dir.join(format!("events-{}.jsonl", old_date.format("%Y-%m-%d")));
639        tokio::fs::write(&old_file, "{}\n")
640            .await
641            .expect("write old file failed");
642
643        let config = FileEventPersisterConfig::default()
644            .with_directory(&dir)
645            .with_enabled(true);
646
647        let persister = FileEventPersister::new(config)
648            .await
649            .expect("Failed to create persister");
650
651        // Cleanup anything older than 1 day
652        let removed = persister
653            .cleanup(chrono::Duration::days(1))
654            .await
655            .expect("cleanup failed");
656
657        assert!(removed >= 1, "Expected at least 1 file removed");
658
659        // Verify old file is gone
660        assert!(!old_file.exists());
661
662        let _ = tokio::fs::remove_dir_all(&dir).await;
663    }
664
665    #[tokio::test]
666    async fn test_file_persister_query_with_filter() {
667        let dir = std::env::temp_dir().join(format!("celers_test_filt_{}", Uuid::new_v4()));
668        let config = FileEventPersisterConfig::default()
669            .with_directory(&dir)
670            .with_enabled(true);
671
672        let persister = FileEventPersister::new(config)
673            .await
674            .expect("Failed to create persister");
675
676        // Emit task and worker events
677        persister
678            .emit(make_task_event("my_task"))
679            .await
680            .expect("emit task failed");
681        persister
682            .emit(make_worker_event())
683            .await
684            .expect("emit worker failed");
685        persister
686            .emit(make_task_event("another_task"))
687            .await
688            .expect("emit task2 failed");
689        persister.flush().await.expect("flush failed");
690
691        let from = Utc::now() - chrono::Duration::hours(1);
692        let to = Utc::now() + chrono::Duration::hours(1);
693
694        // Filter for task-started only
695        let task_events = persister
696            .query_events(from, to, Some("task-started"))
697            .await
698            .expect("query failed");
699        assert_eq!(task_events.len(), 2);
700
701        // Filter for worker-online only
702        let worker_events = persister
703            .query_events(from, to, Some("worker-online"))
704            .await
705            .expect("query failed");
706        assert_eq!(worker_events.len(), 1);
707
708        let _ = tokio::fs::remove_dir_all(&dir).await;
709    }
710
711    #[tokio::test]
712    async fn test_file_persister_count() {
713        let dir = std::env::temp_dir().join(format!("celers_test_cnt_{}", Uuid::new_v4()));
714        let config = FileEventPersisterConfig::default()
715            .with_directory(&dir)
716            .with_enabled(true);
717
718        let persister = FileEventPersister::new(config)
719            .await
720            .expect("Failed to create persister");
721
722        persister
723            .emit(make_task_event("t1"))
724            .await
725            .expect("emit failed");
726        persister
727            .emit(make_task_event("t2"))
728            .await
729            .expect("emit failed");
730        persister
731            .emit(make_worker_event())
732            .await
733            .expect("emit failed");
734        persister.flush().await.expect("flush failed");
735
736        let total = persister
737            .count_events(None)
738            .await
739            .expect("count all failed");
740        assert_eq!(total, 3);
741
742        let task_count = persister
743            .count_events(Some("task-started"))
744            .await
745            .expect("count task failed");
746        assert_eq!(task_count, 2);
747
748        let worker_count = persister
749            .count_events(Some("worker-online"))
750            .await
751            .expect("count worker failed");
752        assert_eq!(worker_count, 1);
753
754        let _ = tokio::fs::remove_dir_all(&dir).await;
755    }
756
757    #[tokio::test]
758    async fn test_file_persister_batch() {
759        let dir = std::env::temp_dir().join(format!("celers_test_batch_{}", Uuid::new_v4()));
760        let config = FileEventPersisterConfig::default()
761            .with_directory(&dir)
762            .with_enabled(true);
763
764        let persister = FileEventPersister::new(config)
765            .await
766            .expect("Failed to create persister");
767
768        let events = vec![
769            make_task_event("batch_1"),
770            make_task_event("batch_2"),
771            make_worker_event(),
772        ];
773
774        persister
775            .emit_batch(events)
776            .await
777            .expect("emit_batch failed");
778
779        let total = persister.count_events(None).await.expect("count failed");
780        assert_eq!(total, 3);
781        assert_eq!(persister.events_written(), 3);
782
783        let _ = tokio::fs::remove_dir_all(&dir).await;
784    }
785
786    #[tokio::test]
787    async fn test_file_persister_disabled() {
788        let dir = std::env::temp_dir().join(format!("celers_test_dis_{}", Uuid::new_v4()));
789        let config = FileEventPersisterConfig::default()
790            .with_directory(&dir)
791            .with_enabled(false);
792
793        let persister = FileEventPersister::new(config)
794            .await
795            .expect("Failed to create persister");
796
797        assert!(!persister.is_enabled());
798
799        // Emitting should be a no-op
800        persister
801            .emit(make_task_event("ignored"))
802            .await
803            .expect("emit failed");
804        assert_eq!(persister.events_written(), 0);
805
806        let _ = tokio::fs::remove_dir_all(&dir).await;
807    }
808}