1use karmen::karmen_client::KarmenClient;
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::time::SystemTime;
5use std::time::UNIX_EPOCH;
6use tokio::sync::mpsc;
7use tokio::sync::Mutex;
8
9pub mod karmen {
10 tonic::include_proto!("karmen");
11}
12
13pub struct Karmen {
14 name: String,
15 client: KarmenClient<tonic::transport::Channel>,
16 actions: Arc<Mutex<HashMap<String, fn(parameters: HashMap<String, String>) -> karmen::Result>>>,
17}
18
19#[tonic::async_trait]
20pub trait KarmenTraits {
21 async fn new(name: &str, host: &str, port: u16)
22 -> Result<Arc<Karmen>, tonic::transport::Error>;
23 async fn ping(&self) -> Result<String, Box<dyn std::error::Error>>;
24 async fn register(&self) -> Result<karmen::Result, Box<dyn std::error::Error>>;
25 async fn handle_actions(&self) -> Result<(), Box<dyn std::error::Error>>;
26 async fn add_action(
27 &self,
28 action: fn(parameters: HashMap<String, String>) -> karmen::Result,
29 name: &str,
30 ) -> Result<(), Box<dyn std::error::Error>>;
31 async fn run_event(
32 &self,
33 name: &str,
34 params: HashMap<String, String>,
35 ) -> Result<karmen::Result, Box<dyn std::error::Error>>;
36}
37
38#[tonic::async_trait]
39impl KarmenTraits for Karmen {
40 async fn new(
41 name: &str,
42 host: &str,
43 port: u16,
44 ) -> Result<Arc<Karmen>, tonic::transport::Error> {
45 let client = KarmenClient::connect(format!("http://{}:{}", host, port)).await?;
46 Ok(Arc::new(Karmen {
47 name: name.to_string(),
48 client: client,
49 actions: Arc::new(Mutex::new(HashMap::new())),
50 }))
51 }
52 async fn ping(&self) -> Result<String, Box<dyn std::error::Error>> {
53 let request = tonic::Request::new(karmen::Ping {
54 message: "Rusty Karmen!".into(),
55 });
56 let response = self.client.clone().ping_pong(request).await?.into_inner();
57 Ok(response.message)
58 }
59 async fn register(&self) -> Result<karmen::Result, Box<dyn std::error::Error>> {
60 let request = tonic::Request::new(karmen::RegisterRequest {
61 name: self.name.clone(),
62 timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64,
63 actions: HashMap::new(),
66 events: HashMap::new(),
67 });
68 let response = self.client.clone().register(request).await?.into_inner();
69 match response.result {
70 Some(r) => {
71 if r.code != 200 {
72 println!("{}", r.code);
73 }
74 Ok(r)
75 }
76 None => Err(Box::new(std::io::Error::new(
77 std::io::ErrorKind::Other,
78 "No response",
79 ))),
80 }
81 }
82 async fn handle_actions(&self) -> Result<(), Box<dyn std::error::Error>> {
83 let name = self.name.clone();
85 let (tx, mut rx): (
86 mpsc::Sender<karmen::ActionResponse>,
87 mpsc::Receiver<karmen::ActionResponse>,
88 ) = mpsc::channel(32);
89 let request = async_stream::stream! {
90 let response = karmen::ActionResponse {
92 hostname: name.clone(),
93 request: None,
95 result: None,
96 };
97 yield response;
98 while let Some(response) = rx.recv().await {
100 yield response;
101 }
102 println!("{}", "Closing stream");
103 };
104 let mut stream = self
105 .client
106 .clone()
107 .action_dispatcher(tonic::Request::new(request))
108 .await?
109 .into_inner();
110 while let Some(req) = stream.message().await? {
111 println!("Got a request to run {:?}", req);
112 let name = self.name.clone();
114 let tx2 = tx.clone();
115 let action = req.clone().action.unwrap();
116 let action_name = action.action_name.clone();
117 let parameters = action.parameters;
118 let action_func = self.actions.lock().await.get(&action_name).unwrap().clone();
120 tokio::spawn(async move {
121 let result = action_func(parameters);
122 let response = karmen::ActionResponse {
124 hostname: name,
125 request: Some(req),
126 result: Some(result),
127 };
128 let _ = tx2.send(response).await;
130 });
131 }
132 Ok(())
133 }
134 async fn add_action(
135 &self,
136 action: fn(parameters: HashMap<String, String>) -> karmen::Result,
137 name: &str,
138 ) -> Result<(), Box<dyn std::error::Error>> {
139 self.actions.lock().await.insert(name.to_string(), action);
140 Ok(())
141 }
142 async fn run_event(
143 &self,
144 name: &str,
145 params: HashMap<String, String>,
146 ) -> Result<karmen::Result, Box<dyn std::error::Error>> {
147 println!("Running event {}", name);
148 let event = Some(karmen::Event {
149 event_name: name.to_string(),
150 timestamp: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as i64,
151 parameters: HashMap::new(),
153 });
154 let request = tonic::Request::new(karmen::EventRequest {
155 event: event,
156 parameters: params,
157 requester_name: self.name.clone(),
158 uuid: "".to_string(),
160 });
161 let response = self.client.clone().emit_event(request).await?.into_inner();
162 match response.result {
163 Some(r) => {
164 if r.code != 200 {
165 println!("{}", r.code);
166 }
167 Ok(r)
168 }
169 None => Err(Box::new(std::io::Error::new(
170 std::io::ErrorKind::Other,
171 "No response",
172 ))),
173 }
174 }
175}