1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use log::info;
use async_trait::async_trait;

use crate::event_emitter::EventEmitter;
use std::time::{SystemTime, UNIX_EPOCH};
use std::fs::File;
use std::io::Write;
use std::fmt::Debug;

fn time_based_key_fn() -> String {
    let cur_ms = match SystemTime::now().duration_since(UNIX_EPOCH) {
        Ok(n) => n.as_millis(),
        Err(_) => panic!("SystemTime before UNIX EPOCH!"),
    };

    let cur_day = cur_ms - (cur_ms % 86400);

    format!(
        "{}-{}-{}",
        cur_day, cur_ms, uuid::Uuid::new_v4()
    )
}

pub struct FsEventEmitter {
    directory: String
}

impl FsEventEmitter {
    pub fn new(directory: impl Into<String>) -> Self {
        let directory = directory.into();
        Self {
            directory
        }
    }
}

#[async_trait]
impl EventEmitter for FsEventEmitter
{
    type Event = Vec<u8>;
    type Error = crate::error::Error<()>;

    async fn emit_event(&mut self, completed_events: Vec<Self::Event>) -> Result<(), Self::Error> {
        info!("Writing out {} bytes", completed_events.len());
        for event in completed_events {
            let path = time_based_key_fn();
            let path = format!("{}{}", self.directory, &path);
            let mut file = File::create(&path).expect(&path);
            file.write_all(&event).expect(&path);
            file.flush().expect(&path);
        }

        Ok(())
    }
}