use futures::StreamExt;
use serde_json::from_str;
use std::collections::HashMap;
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use std::task::Waker;
use tokio::sync::mpsc::Receiver;
#[derive(Debug, Clone)]
pub enum Event {
Activity {
node_id: String,
activity_id: String,
waker: Waker,
},
}
pub struct Reactor {
pub events: Arc<Mutex<HashMap<String, Waker>>>,
}
impl Reactor {
pub fn new() -> Self {
Reactor {
events: Default::default(),
}
}
pub async fn run(self, mut internal_rx: Receiver<Event>) -> Result<(), std::io::Error> {
let nats_client = nats::connect("127.0.0.1:4222")?;
let mut response_sub = nats_client.subscribe("activity.response")?;
let event_collection = self.events.clone();
let response_collection = self.events.clone();
let client_clone = nats_client.clone();
let internal_handle = tokio::task::spawn(async move {
while let Some(event) = internal_rx.recv().await {
let _ = register_event(event_collection.clone(), event, &client_clone).await;
}
});
let external_handle = tokio::task::spawn(async move {
while let Some(msg) = response_sub.next() {
let move_msg = msg;
let node_id: String =
from_str::<String>(from_utf8(&move_msg.data).expect("Unable to read msg"))
.expect("Unable to deserialize to string");
let mut inner = response_collection.lock().expect("Locking failed");
if let Some(waker) = inner.remove(&node_id) {
waker.wake();
}
}
});
let _ = (internal_handle.await, external_handle.await);
Ok(())
}
}
async fn register_event(
event_collection: Arc<Mutex<HashMap<String, Waker>>>,
event: Event,
nats_client: &nats::Connection,
) -> Result<(), std::io::Error> {
match event {
Event::Activity {
node_id,
activity_id: _activity_id,
waker,
} => {
{
let mut inner = event_collection.lock().expect("Locking failed");
inner.insert(node_id.clone(), waker.clone());
}
let _ = nats_client.publish("activity.execute", &node_id)?;
}
}
Ok(())
}