use crate::utils::Config;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Clone, Default)]
pub struct CancelToken {
inner: Arc<CancelInner>,
}
#[derive(Default)]
struct CancelInner {
cancelled: AtomicBool,
notify: Notify,
}
impl CancelToken {
pub fn new() -> Self {
Self::default()
}
pub fn cancel(&self) {
self.inner.cancelled.store(true, Ordering::SeqCst);
self.inner.notify.notify_waiters();
}
pub fn is_cancelled(&self) -> bool {
self.inner.cancelled.load(Ordering::SeqCst)
}
pub async fn cancelled(&self) {
if self.is_cancelled() {
return;
}
let notified = self.inner.notify.notified();
if self.is_cancelled() {
return;
}
notified.await;
}
}
pub struct Context {
pub config: Arc<Config>,
pub run_id: String,
blackboard: RwLock<HashMap<String, serde_json::Value>>,
events: RwLock<HashMap<String, Arc<Notify>>>,
cancel: CancelToken,
}
impl Context {
pub fn new(config: Arc<Config>) -> Self {
Context {
config,
run_id: uuid::Uuid::new_v4().to_string(),
blackboard: RwLock::new(HashMap::new()),
events: RwLock::new(HashMap::new()),
cancel: CancelToken::new(),
}
}
pub fn for_tests() -> Arc<Self> {
Arc::new(Context::new(Arc::new(Config::default())))
}
pub fn cancel_token(&self) -> &CancelToken {
&self.cancel
}
pub fn cancel(&self) {
self.cancel.cancel();
}
pub fn is_cancelled(&self) -> bool {
self.cancel.is_cancelled()
}
pub fn set(&self, key: impl Into<String>, value: serde_json::Value) {
self.blackboard.write().insert(key.into(), value);
}
pub fn get(&self, key: &str) -> Option<serde_json::Value> {
self.blackboard.read().get(key).cloned()
}
pub fn get_as<T: serde::de::DeserializeOwned>(&self, key: &str) -> Option<T> {
self.get(key).and_then(|v| serde_json::from_value(v).ok())
}
fn event_handle(&self, name: &str) -> Arc<Notify> {
if let Some(n) = self.events.read().get(name) {
return n.clone();
}
let mut w = self.events.write();
w.entry(name.to_string())
.or_insert_with(|| Arc::new(Notify::new()))
.clone()
}
pub fn emit(&self, name: &str) {
self.event_handle(name).notify_waiters();
}
pub async fn wait_for(&self, name: &str) -> bool {
let notify = self.event_handle(name);
let notified = notify.notified();
tokio::select! {
_ = notified => true,
_ = self.cancel.cancelled() => false,
}
}
}