1pub struct Context {
3 pub name: String,
5
6 pub created_at: std::time::Instant,
8
9 parent: Option<std::sync::Arc<Context>>,
11
12 children: std::sync::Arc<std::sync::Mutex<Vec<std::sync::Arc<Context>>>>,
14
15 cancellation_token: tokio_util::sync::CancellationToken,
17}
18
19impl Context {
20 pub fn new(name: &str) -> std::sync::Arc<Context> {
22 std::sync::Arc::new(Context {
23 name: name.to_string(),
24 created_at: std::time::Instant::now(),
25 parent: None,
26 children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
27 cancellation_token: tokio_util::sync::CancellationToken::new(),
28 })
29 }
30
31 pub fn child(&self, name: &str) -> ContextBuilder {
33 let child_context = std::sync::Arc::new(Context {
34 name: name.to_string(),
35 created_at: std::time::Instant::now(),
36 parent: Some(std::sync::Arc::new(self.clone())),
37 children: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
38 cancellation_token: self.cancellation_token.child_token(),
39 });
40
41 if let Ok(mut children) = self.children.lock() {
43 children.push(child_context.clone());
44 }
45
46 ContextBuilder {
47 context: child_context,
48 }
49 }
50
51 pub fn spawn<F>(&self, task: F) -> tokio::task::JoinHandle<F::Output>
53 where
54 F: std::future::Future + Send + 'static,
55 F::Output: Send + 'static,
56 {
57 tokio::spawn(task)
58 }
59
60 pub fn spawn_child<F, Fut>(&self, name: &str, task: F) -> tokio::task::JoinHandle<Fut::Output>
62 where
63 F: FnOnce(std::sync::Arc<Context>) -> Fut + Send + 'static,
64 Fut: std::future::Future + Send + 'static,
65 Fut::Output: Send + 'static,
66 {
67 let child_ctx = self.child(name);
68 child_ctx.spawn(task)
69 }
70
71 pub async fn wait(&self) {
73 loop {
75 if self.is_cancelled() {
76 return;
77 }
78 tokio::task::yield_now().await;
80 }
81 }
82
83 pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
85 self.cancellation_token.cancelled()
86 }
87
88 pub fn is_cancelled(&self) -> bool {
90 self.cancellation_token.is_cancelled()
91 }
92
93 pub fn cancel(&self) {
95 self.cancellation_token.cancel();
96 }
97
98 pub fn persist(&self) {
100 let context_status = self.status();
101 crate::status::add_persisted_context(context_status);
102 }
103
104 pub fn status(&self) -> crate::status::ContextStatus {
106 let children = if let Ok(children_lock) = self.children.lock() {
107 children_lock.iter().map(|child| child.status()).collect()
108 } else {
109 Vec::new()
110 };
111
112 crate::status::ContextStatus {
113 name: self.name.clone(),
114 is_cancelled: self.is_cancelled(),
115 duration: self.created_at.elapsed(),
116 children,
117 }
118 }
119}
120
121impl Clone for Context {
122 fn clone(&self) -> Self {
123 Context {
124 name: self.name.clone(),
125 created_at: self.created_at,
126 parent: self.parent.clone(),
127 children: self.children.clone(),
128 cancellation_token: self.cancellation_token.clone(),
129 }
130 }
131}
132
133pub struct ContextBuilder {
135 pub(crate) context: std::sync::Arc<Context>,
136}
137
138impl ContextBuilder {
139 pub fn spawn<F, Fut>(self, task: F) -> tokio::task::JoinHandle<Fut::Output>
141 where
142 F: FnOnce(std::sync::Arc<Context>) -> Fut + Send + 'static,
143 Fut: std::future::Future + Send + 'static,
144 Fut::Output: Send + 'static,
145 {
146 let context = self.context;
147 tokio::spawn(async move { task(context).await })
148 }
149}
150
151static GLOBAL_CONTEXT: std::sync::LazyLock<std::sync::Arc<Context>> =
153 std::sync::LazyLock::new(|| Context::new("global"));
154
155pub fn global() -> std::sync::Arc<Context> {
157 GLOBAL_CONTEXT.clone()
158}