1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use busybody::ServiceContainer;
use std::sync::Arc;

#[derive(Clone)]
pub struct PipeContent(pub(crate) Arc<ServiceContainer>);

#[derive(Debug, PartialEq, PartialOrd)]
pub(crate) enum PipeState {
    Run,
    Stop,
}

impl Default for PipeContent {
    fn default() -> Self {
        let container = Arc::new(ServiceContainer::proxy());
        container.set(PipeState::Run);

        let pipe = Self(container);
        pipe.container().set_type(pipe.clone()).get_type().unwrap()
    }
}

#[busybody::async_trait]
impl busybody::Injectable for PipeContent {
    async fn inject(c: &ServiceContainer) -> Self {
        c.get_type().unwrap_or_default()
    }
}

impl PipeContent {
    pub fn new<T>(content: T) -> Self
    where
        T: Clone + Send + Sync + 'static,
    {
        let pipe = Self::default();
        pipe.container().set_type(content);
        pipe
    }
    /// Returns a busybody's ServiceContainer
    pub fn container(&self) -> &Arc<ServiceContainer> {
        &self.0
    }

    pub fn store<T: Clone + Send + Sync + 'static>(&self, data: T) -> &Self {
        self.container().set_type(data);
        self
    }

    /// Notify the pipeline to stop flowing the content
    pub fn stop_the_flow(&self) {
        self.container().set(PipeState::Stop);
    }

    /// Alias for `stop_the_flow`
    pub fn stop(&self) {
        self.stop_the_flow();
    }
}

#[cfg(test)]
mod test {
    use busybody::helpers::provide;

    use super::*;

    #[tokio::test]
    async fn test_flow_is_running() {
        let pipe: PipeContent = provide().await;

        assert_eq!(
            *pipe.container().get::<PipeState>().unwrap(),
            PipeState::Run,
            "The pipe state should have been 'run'"
        );
    }

    #[tokio::test]
    async fn test_flow_stop() {
        let pipe: PipeContent = provide().await;
        pipe.stop();

        assert_eq!(
            *pipe.container().get::<PipeState>().unwrap(),
            PipeState::Stop
        );
    }
}