drasi_source_sdk/
reactivator.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::{env, panic, io::Write};
use std::fs::OpenOptions;
use std::{future::Future, pin::Pin, sync::Arc};

use futures::{Stream, StreamExt};
use thiserror::Error;

use crate::dapr_statestore::DaprStateStore;
use crate::dapr_publisher::DaprPublisher;
use crate::telemetry::init_tracer;
use crate::{models::SourceChange, Publisher, StateStore};

#[derive(Error, Debug)]
pub enum ReactivatorError {
    #[error("Internal error: {0}")]
    InternalError(String),

    #[error("Error from state store: {0}")]
    StateStoreError(Box<dyn std::error::Error>),
}

pub type ChangeStream = Pin<Box<dyn Stream<Item = SourceChange> + Send>>;

pub struct ReactivatorBuilder<Response>
where
    Response: Future<Output = Result<ChangeStream, ReactivatorError>> + Send + 'static,
{
    stream_producer: Option<fn(Arc<dyn StateStore + Send + Sync>) -> Response>,
    publisher: Option<Box<dyn Publisher>>,
    state_store: Option<Arc<dyn StateStore + Send + Sync>>,
}

impl<Response> ReactivatorBuilder<Response>
where
    Response: Future<Output = Result<ChangeStream, ReactivatorError>> + Send,
{
    pub fn new() -> Self {
        ReactivatorBuilder {
            stream_producer: None,
            publisher: None,
            state_store: None,
        }
    }

    pub fn with_stream_producer(
        mut self,
        stream_producer: fn(Arc<dyn StateStore + Send + Sync>) -> Response,
    ) -> Self {
        self.stream_producer = Some(stream_producer);
        self
    }

    pub fn with_publisher(mut self, publisher: impl Publisher + 'static) -> Self {
        self.publisher = Some(Box::new(publisher));
        self
    }

    pub fn with_state_store(mut self, state_store: impl StateStore + Send + Sync + 'static) -> Self {
        self.state_store = Some(Arc::new(state_store));
        self
    }

    pub async fn build(self) -> Reactivator<Response> {
        Reactivator {
            stream_fn: self.stream_producer.expect("Stream producer is required"),
            publisher: self.publisher.unwrap_or_else(|| Box::new(DaprPublisher::new())),
            state_store: match self.state_store {
                Some(ss) => ss,
                None => Arc::new(DaprStateStore::connect().await.unwrap()),
            },
        }
    }
}

pub struct Reactivator<Response>
where
    Response: Future<Output = Result<ChangeStream, ReactivatorError>> + Send + 'static,
{
    stream_fn: fn(Arc<dyn StateStore + Send + Sync>) -> Response,
    publisher: Box<dyn Publisher>,
    state_store: Arc<dyn StateStore + Send + Sync>,
}

impl<Response> Reactivator<Response>
where
    Response: Future<Output = Result<ChangeStream, ReactivatorError>> + Send + 'static,
{
    pub async fn start(&mut self) {
        env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

        panic::set_hook(Box::new(|info| {            
            if let Some(message) = info.payload().downcast_ref::<String>() {                
                log::error!("Panic occurred: {} \n{:?}", message, info.location()); 
                if let Ok(mut file) = OpenOptions::new().create(true).write(true).open("/dev/termination-log") {
                    let _ = writeln!(file, "Panic occurred: {}", message);
                }
            } else if let Some(message) = info.payload().downcast_ref::<&str>() {
                log::error!("Panic occurred: {} \n{:?}", message, info.location()); 
                if let Ok(mut file) = OpenOptions::new().create(true).write(true).open("/dev/termination-log") {
                    let _ = writeln!(file, "Panic occurred: {}", message);
                }
            }           
        }));        
        log::info!("Starting reactivator");
        
        _ = init_tracer(format!("{}-reactivator", env::var("SOURCE_ID").expect("SOURCE_ID required"))).unwrap();

        log::info!("Initialized tracing");

        let producer = &self.stream_fn;
        let state_store = self.state_store.clone();
        let mut stream = producer(state_store).await.unwrap();

        while let Some(data) = stream.next().await {
            if let Err(err) = self.publisher.publish(data).await {
                panic!("Error publishing: {}", err)
            }
        }
    }
}