pub struct Context {
pub name: String,
pub created_at: std::time::Instant,
parent: Option<std::sync::Arc<Context>>,
children: std::sync::Arc<std::sync::Mutex<Vec<std::sync::Arc<Context>>>>,
cancellation_token: tokio_util::sync::CancellationToken,
}
impl Context {
pub fn new(name: &str) -> std::sync::Arc<Context> {
std::sync::Arc::new(Context {
name: name.to_string(),
created_at: std::time::Instant::now(),
parent: None,
children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
cancellation_token: tokio_util::sync::CancellationToken::new(),
})
}
pub fn child(&self, name: &str) -> ContextBuilder {
let child_context = std::sync::Arc::new(Context {
name: name.to_string(),
created_at: std::time::Instant::now(),
parent: Some(std::sync::Arc::new(self.clone())),
children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
cancellation_token: self.cancellation_token.child_token(),
});
if let Ok(mut children) = self.children.lock() {
children.push(child_context.clone());
}
ContextBuilder {
context: child_context,
}
}
pub fn spawn<F>(&self, task: F) -> tokio::task::JoinHandle<F::Output>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(task)
}
pub fn spawn_child<F, Fut>(&self, name: &str, task: F) -> tokio::task::JoinHandle<Fut::Output>
where
F: FnOnce(std::sync::Arc<Context>) -> Fut + Send + 'static,
Fut: std::future::Future + Send + 'static,
Fut::Output: Send + 'static,
{
let child_ctx = self.child(name);
child_ctx.spawn(task)
}
pub async fn wait(&self) {
loop {
if self.is_cancelled() {
return;
}
tokio::task::yield_now().await;
}
}
pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
self.cancellation_token.cancelled()
}
pub fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled()
}
pub fn cancel(&self) {
self.cancellation_token.cancel();
}
pub fn persist(&self) {
let context_status = self.status();
crate::status::add_persisted_context(context_status);
}
pub fn status(&self) -> crate::status::ContextStatus {
let children = if let Ok(children_lock) = self.children.lock() {
children_lock.iter().map(|child| child.status()).collect()
} else {
Vec::new()
};
crate::status::ContextStatus {
name: self.name.clone(),
is_cancelled: self.is_cancelled(),
duration: self.created_at.elapsed(),
children,
}
}
}
impl Clone for Context {
fn clone(&self) -> Self {
Context {
name: self.name.clone(),
created_at: self.created_at,
parent: self.parent.clone(),
children: self.children.clone(),
cancellation_token: self.cancellation_token.clone(),
}
}
}
pub struct ContextBuilder {
pub(crate) context: std::sync::Arc<Context>,
}
impl ContextBuilder {
pub fn spawn<F, Fut>(self, task: F) -> tokio::task::JoinHandle<Fut::Output>
where
F: FnOnce(std::sync::Arc<Context>) -> Fut + Send + 'static,
Fut: std::future::Future + Send + 'static,
Fut::Output: Send + 'static,
{
let context = self.context;
tokio::spawn(async move { task(context).await })
}
}
static GLOBAL_CONTEXT: std::sync::LazyLock<std::sync::Arc<Context>> =
std::sync::LazyLock::new(|| Context::new("global"));
pub fn global() -> std::sync::Arc<Context> {
GLOBAL_CONTEXT.clone()
}