karmen/
lib.rs

1use karmen::karmen_client::KarmenClient;
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::time::UNIX_EPOCH;
6use tokio::sync::mpsc;
7use tokio::sync::Mutex;
8
9pub mod karmen {
10    tonic::include_proto!("karmen");
11}
12
13pub struct Karmen {
14    name: String,
15    client: KarmenClient<tonic::transport::Channel>,
16    actions: Arc<Mutex<HashMap<String, fn(parameters: HashMap<String, String>) -> karmen::Result>>>,
17}
18
19#[tonic::async_trait]
20pub trait KarmenTraits {
21    async fn new(name: &str, host: &str, port: u16)
22        -> Result<Arc<Karmen>, tonic::transport::Error>;
23    async fn ping(&self) -> Result<String, Box<dyn std::error::Error>>;
24    async fn register(&self) -> Result<karmen::Result, Box<dyn std::error::Error>>;
25    async fn handle_actions(&self) -> Result<(), Box<dyn std::error::Error>>;
26    async fn add_action(
27        &self,
28        action: fn(parameters: HashMap<String, String>) -> karmen::Result,
29        name: &str,
30    ) -> Result<(), Box<dyn std::error::Error>>;
31    async fn run_event(
32        &self,
33        name: &str,
34        params: HashMap<String, String>,
35    ) -> Result<karmen::Result, Box<dyn std::error::Error>>;
36}
37
38#[tonic::async_trait]
39impl KarmenTraits for Karmen {
40    async fn new(
41        name: &str,
42        host: &str,
43        port: u16,
44    ) -> Result<Arc<Karmen>, tonic::transport::Error> {
45        let client = KarmenClient::connect(format!("http://{}:{}", host, port)).await?;
46        Ok(Arc::new(Karmen {
47            name: name.to_string(),
48            client: client,
49            actions: Arc::new(Mutex::new(HashMap::new())),
50        }))
51    }
52    async fn ping(&self) -> Result<String, Box<dyn std::error::Error>> {
53        let request = tonic::Request::new(karmen::Ping {
54            message: "Rusty Karmen!".into(),
55        });
56        let response = self.client.clone().ping_pong(request).await?.into_inner();
57        Ok(response.message)
58    }
59    async fn register(&self) -> Result<karmen::Result, Box<dyn std::error::Error>> {
60        let request = tonic::Request::new(karmen::RegisterRequest {
61            name: self.name.clone(),
62            timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64,
63            // no actions or events are sent as part of registration
64            // TODO: investigate if these can be removed from the protobuf definition
65            actions: HashMap::new(),
66            events: HashMap::new(),
67        });
68        let response = self.client.clone().register(request).await?.into_inner();
69        match response.result {
70            Some(r) => {
71                if r.code != 200 {
72                    println!("{}", r.code);
73                }
74                Ok(r)
75            }
76            None => Err(Box::new(std::io::Error::new(
77                std::io::ErrorKind::Other,
78                "No response",
79            ))),
80        }
81    }
82    async fn handle_actions(&self) -> Result<(), Box<dyn std::error::Error>> {
83        //send an action response with our details one time. For the rest of time, sleep
84        let name = self.name.clone();
85        let (tx, mut rx): (
86            mpsc::Sender<karmen::ActionResponse>,
87            mpsc::Receiver<karmen::ActionResponse>,
88        ) = mpsc::channel(32);
89        let request = async_stream::stream! {
90            // Send our details once
91            let response = karmen::ActionResponse {
92                hostname: name.clone(),
93                // unused on inital message
94                request: None,
95                result: None,
96            };
97            yield response;
98            // Listen for more outgoing messages from the queue
99            while let Some(response) = rx.recv().await {
100                yield response;
101            }
102            println!("{}", "Closing stream");
103        };
104        let mut stream = self
105            .client
106            .clone()
107            .action_dispatcher(tonic::Request::new(request))
108            .await?
109            .into_inner();
110        while let Some(req) = stream.message().await? {
111            println!("Got a request to run {:?}", req);
112            // Prepare the function call with a clone of the request
113            let name = self.name.clone();
114            let tx2 = tx.clone();
115            let action = req.clone().action.unwrap();
116            let action_name = action.action_name.clone();
117            let parameters = action.parameters;
118            // Run the function asyncronously
119            let action_func = self.actions.lock().await.get(&action_name).unwrap().clone();
120            tokio::spawn(async move {
121                let result = action_func(parameters);
122                // Process the result
123                let response = karmen::ActionResponse {
124                    hostname: name,
125                    request: Some(req),
126                    result: Some(result),
127                };
128                // Send the result back through the queue
129                let _ = tx2.send(response).await;
130            });
131        }
132        Ok(())
133    }
134    async fn add_action(
135        &self,
136        action: fn(parameters: HashMap<String, String>) -> karmen::Result,
137        name: &str,
138    ) -> Result<(), Box<dyn std::error::Error>> {
139        self.actions.lock().await.insert(name.to_string(), action);
140        Ok(())
141    }
142    async fn run_event(
143        &self,
144        name: &str,
145        params: HashMap<String, String>,
146    ) -> Result<karmen::Result, Box<dyn std::error::Error>> {
147        println!("Running event {}", name);
148        let event = Some(karmen::Event {
149            event_name: name.to_string(),
150            timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64,
151            // TODO: determine if this is used
152            parameters: HashMap::new(),
153        });
154        let request = tonic::Request::new(karmen::EventRequest {
155            event: event,
156            parameters: params,
157            requester_name: self.name.clone(),
158            // TODO: determine if this is used
159            uuid: "".to_string(),
160        });
161        let response = self.client.clone().emit_event(request).await?.into_inner();
162        match response.result {
163            Some(r) => {
164                if r.code != 200 {
165                    println!("{}", r.code);
166                }
167                Ok(r)
168            }
169            None => Err(Box::new(std::io::Error::new(
170                std::io::ErrorKind::Other,
171                "No response",
172            ))),
173        }
174    }
175}