use std::any::Any;
use std::sync::{Arc, Mutex};
use crate::frames::{Frame, FrameDirection};
use crate::ravi::models as ravi_models;
pub type PushSender = tokio::sync::mpsc::Sender<(Frame, FrameDirection)>;
#[derive(Clone)]
pub struct DharaContext {
push: Arc<std::sync::OnceLock<PushSender>>,
flow_state: Arc<dyn Any + Send + Sync>,
current_node: Arc<Mutex<String>>,
connection_id: u64,
}
impl DharaContext {
pub fn new(flow_state: Arc<dyn Any + Send + Sync>, connection_id: u64) -> Self {
Self {
push: Arc::new(std::sync::OnceLock::new()),
flow_state,
current_node: Arc::new(Mutex::new(String::new())),
connection_id,
}
}
pub fn set_push_sender(&self, sender: PushSender) {
let _ = self.push.set(sender);
}
pub(crate) fn push_arc(&self) -> &Arc<std::sync::OnceLock<PushSender>> {
&self.push
}
pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
self.flow_state.downcast_ref::<T>()
}
pub fn state_arc(&self) -> &Arc<dyn Any + Send + Sync> {
&self.flow_state
}
pub fn current_node(&self) -> String {
self.current_node.lock().unwrap().clone()
}
pub(crate) fn set_current_node(&self, name: &str) {
*self.current_node.lock().unwrap() = name.to_string();
}
pub fn connection_id(&self) -> u64 {
self.connection_id
}
pub async fn push_frame(&self, frame: Frame, direction: FrameDirection) -> bool {
if let Some(tx) = self.push.get() {
tx.send((frame, direction)).await.is_ok()
} else {
log::warn!("[conn={}] DharaContext: push sender not yet initialized", self.connection_id);
false
}
}
pub async fn push_downstream(&self, frame: Frame) -> bool {
self.push_frame(frame, FrameDirection::Downstream).await
}
pub async fn push_upstream(&self, frame: Frame) -> bool {
self.push_frame(frame, FrameDirection::Upstream).await
}
pub async fn push_ravi_message(&self, data: serde_json::Value) -> bool {
let payload = ravi_models::msg_server_message(data);
let frame = Frame::ravi_server_message(payload);
self.push_downstream(frame).await
}
}