1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::time::Duration;
use log::{info};
use crossbeam_channel::RecvTimeoutError;
use derive_aktor::derive_actor;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher, Event};
use notify::event::{CreateKind, Event as FsEvent};
use tokio::sync::mpsc::{channel, Receiver, Sender};

use async_trait::async_trait;

use crate::completion_event_serializer::CompletionEventSerializer;
use crate::completion_handler::CompletionHandler;
use crate::error::Error;
use crate::event_handler::OutputEvent;
use crate::event_processor::EventProcessorActor;
use crate::fs_completion_handler::FsCompletionHandlerActor;
use crate::consumer::Consumer;
use std::fmt::Debug;

pub struct FsNotifyConsumerHandler<Err, CE, EventSerializer>
    where
        Err: Debug + Send + Sync + Clone + 'static,
        CE: Send + Sync + Clone + 'static,
        EventSerializer: CompletionEventSerializer<CompletedEvent=CE, Output=Vec<u8>, Error=crate::error::Error<Err>> + Send + Sync + Clone + 'static,
          

{
    watcher: Option<RecommendedWatcher>,
    rx: crossbeam_channel::Receiver<FsEvent>,
    self_actor: Option<FsNotifyConsumerHandlerActor<Err, CE, EventSerializer>>,
    completion_handler: FsCompletionHandlerActor<Err, CE, EventSerializer>,
}

impl<Err, CE, EventSerializer> FsNotifyConsumerHandler<Err, CE, EventSerializer>
    where
        Err: Debug + Send + Sync + Clone + 'static,
        CE: Send + Sync + Clone + 'static,
        EventSerializer: CompletionEventSerializer<CompletedEvent=CE, Output=Vec<u8>, Error=crate::error::Error<Err>> + Send + Sync + Clone + 'static,
          
{
    pub fn new(
        directory: impl AsRef<str>,
        completion_handler: FsCompletionHandlerActor<Err, CE, EventSerializer>,
    ) -> Self {
        let directory = directory.as_ref();
        let (tx, rx) = crossbeam_channel::unbounded();

        let mut watcher: RecommendedWatcher = Watcher::new_immediate(
            move |res: Result<FsEvent, notify::Error>| tx.send(res.unwrap()).unwrap()
        ).expect("Watched failed to init");

        watcher.configure(Config::PreciseEvents(true))
            .expect(&format!("watcher.configure {}", directory));
        watcher.watch(directory, RecursiveMode::Recursive)
            .expect(&format!("watcher.watch {}", directory));

        Self {
            watcher: Some(watcher),
            rx,
            self_actor: None,
            completion_handler,
        }
    }
}

#[derive_actor]
impl<
    Err: Debug + Send + Sync + Clone + 'static,
    CE: Send + Sync + Clone + 'static,
    EventSerializer: CompletionEventSerializer<CompletedEvent=CE, Output=Vec<u8>, Error=crate::error::Error<Err>> + Send + Sync + Clone + 'static,
>
FsNotifyConsumerHandler<Err, CE, EventSerializer>
{
    pub async fn get_next_event(&mut self, event_processor: EventProcessorActor<FsEvent>) {
        info!("Received get_next_event request");
        match self.rx.recv_timeout(Duration::from_secs(2)) {
            Ok(event) => {
                match event.kind {
                    EventKind::Create(CreateKind::File) => {
                        info!("Sending a file create event");
                        event_processor.process_event(
                            event,
                        ).await;
                    }
                    e => {
                        info!("Received non-create event: {:?}", e);
                        self.self_actor.clone().unwrap().get_next_event(event_processor).await;
                    }
                }
            }
            Err(e) => {
                info!("Timeout, forcing ack");
                // self.watcher = None;
                let (tx, shutdown_notify) = tokio::sync::oneshot::channel();
                self.completion_handler.ack_all(Some(tx)).await;

                let _ = shutdown_notify.await;

                self.self_actor.clone().unwrap().get_next_event(event_processor).await;
                    // // event_processor.stop_processing().await;
                    // event_processor.release().await;
                    // self.completion_handler.clone().release().await;
                    // self.self_actor.clone().unwrap().release().await;

            }
        }
    }

    pub fn _phantom(&self, _p: std::marker::PhantomData<(Err, CE, EventSerializer)>) {}
}

#[async_trait]
impl<Err, CE, EventSerializer> Consumer<FsEvent> for FsNotifyConsumerHandlerActor<Err, CE, EventSerializer>
    where
        Err: Debug + Clone + Send + Sync + Clone + 'static,
        CE: Send + Sync + Clone + 'static,
        EventSerializer: CompletionEventSerializer<CompletedEvent=CE, Output=Vec<u8>, Error=crate::error::Error<Err>> + Send + Sync + Clone + 'static,

{
    async fn get_next_event(&self, event_processor: EventProcessorActor<Event>) {
        FsNotifyConsumerHandlerActor::get_next_event(
            self,
            event_processor
        ).await
    }
}