1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use crate::Consumer;
use actix::prelude::*;

// #[derive(Message)]
// #[rtype(result = "()")]
// pub enum WorkerManagement {
//     Status,
//     Stop,
//     Restart,
// }

// pub struct WorkerStatus {
//     load: u64,
//     running: bool,
//     since: chrono::DateTime<chrono::Local>,
//     state: String,
// }

// pub enum WorkerResponse {
//     Status(WorkerStatus)
//     Action()
// }

pub struct Worker {
    //addrs: Vec<Recipient<WorkerManagement>>,
}

impl Worker {
    pub fn register<F, C>(self, factory: F) -> Self
    where
        F: Fn() -> C,
        C: 'static + Consumer + Send + Actor<Context = actix::Context<C>>, // + Handler<WorkerManagement>,
    {
        let workers = 1;
        self.register_with_threads(workers, factory)
    }

    pub fn register_with_threads<F, C>(self, count: usize, factory: F) -> Self
    where
        F: Fn() -> C,
        C: 'static + Consumer + Send + Actor<Context = actix::Context<C>>, // + Handler<WorkerManagement>,
    {
        for _worker in 0..count {
            let consumer = factory();
            let _addr = Actor::start_in_arbiter(&Arbiter::new(), move |_| consumer);
            // self.addrs.push(addr.into());
        }
        self
    }

    pub fn new() -> Self {
        Worker {
            // addrs: vec![]
        }
    }

    pub async fn run(self) {
        actix_rt::signal::ctrl_c()
            .await
            .expect("failed to listen for ctrl_c");
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    struct TestConsumer;
    impl Consumer for TestConsumer {}

    impl Actor for TestConsumer {
        type Context = Context<Self>;
    }

    #[actix_rt::test]
    async fn test_worker() {
        let res = Worker::new().register_with_threads(2, move || TestConsumer);
        assert!(Some(res).is_some())
    }
}