1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
use log::info; use async_trait::async_trait; use crate::event_emitter::EventEmitter; use std::time::{SystemTime, UNIX_EPOCH}; use std::fs::File; use std::io::Write; use std::fmt::Debug; fn time_based_key_fn() -> String { let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(n) => n.as_millis(), Err(_) => panic!("SystemTime before UNIX EPOCH!"), }; let cur_day = cur_ms - (cur_ms % 86400); format!( "{}-{}-{}", cur_day, cur_ms, uuid::Uuid::new_v4() ) } pub struct FsEventEmitter { directory: String } impl FsEventEmitter { pub fn new(directory: impl Into<String>) -> Self { let directory = directory.into(); Self { directory } } } #[async_trait] impl EventEmitter for FsEventEmitter { type Event = Vec<u8>; type Error = crate::error::Error<()>; async fn emit_event(&mut self, completed_events: Vec<Self::Event>) -> Result<(), Self::Error> { info!("Writing out {} bytes", completed_events.len()); for event in completed_events { let path = time_based_key_fn(); let path = format!("{}{}", self.directory, &path); let mut file = File::create(&path).expect(&path); file.write_all(&event).expect(&path); file.flush().expect(&path); } Ok(()) } }