ll/task.rs
1use crate::data::DataValue;
2use crate::task_tree::{TaskTree, TASK_TREE};
3use crate::uniq_id::UniqID;
4use anyhow::Result;
5use std::future::Future;
6use std::sync::Arc;
7
8pub type MarkDoneOnDrop = bool;
9
10#[derive(Clone)]
11pub struct Task(pub(crate) Arc<TaskData>);
12
13pub(crate) struct TaskData {
14 pub(crate) id: UniqID,
15 pub(crate) task_tree: Arc<TaskTree>,
16 pub(crate) mark_done_on_drop: MarkDoneOnDrop,
17}
18
19impl Task {
20 pub fn create_new(name: &str) -> Self {
21 let id = TASK_TREE.create_task_internal(name, None);
22 Self(Arc::new(TaskData {
23 id,
24 task_tree: TASK_TREE.clone(),
25 mark_done_on_drop: true,
26 }))
27 }
28
29 pub fn create(&self, name: &str) -> Self {
30 let id = self.0.task_tree.create_task_internal(name, Some(self.0.id));
31 Self(Arc::new(TaskData {
32 id,
33 task_tree: self.0.task_tree.clone(),
34 mark_done_on_drop: true,
35 }))
36 }
37
38 /// Spawn a new top level task, with no parent.
39 /// This should usually be done in the very beginning of
40 /// the process/application.
41 pub async fn spawn_new<F, FT, T>(name: &str, f: F) -> Result<T>
42 where
43 F: FnOnce(Task) -> FT,
44 FT: Future<Output = Result<T>> + Send,
45 T: Send,
46 {
47 TASK_TREE.spawn(name.into(), f, None).await
48 }
49
50 /// Run an async closure as a child task on the current async thread.
51 ///
52 /// The future runs inline — it is `.await`ed directly without creating
53 /// a new tokio task. This means it shares the current tokio task and will
54 /// not make progress unless the caller awaits the returned future.
55 pub async fn spawn<F, FT, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
56 where
57 F: FnOnce(Task) -> FT,
58 FT: Future<Output = Result<T>> + Send,
59 T: Send,
60 {
61 self.0
62 .task_tree
63 .spawn(name.into(), f, Some(self.0.id))
64 .await
65 }
66
67 /// Run an async closure as a child task on a **new tokio task**
68 /// (`tokio::spawn`).
69 ///
70 /// Unlike [`spawn`](Self::spawn), the future runs concurrently on the
71 /// tokio runtime — it does not block the caller's async task. Use this
72 /// when you need true parallelism across multiple async operations without
73 /// manually calling `tokio::spawn` and cloning the parent task handle.
74 ///
75 /// The closure and its return type must be `'static` because they are
76 /// moved into a detached tokio task.
77 ///
78 /// If the spawned task panics, the ll task is marked as failed and the
79 /// panic is returned as an `anyhow::Error`.
80 pub async fn spawn_tokio<F, FT, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
81 where
82 F: FnOnce(Task) -> FT + Send + 'static,
83 FT: Future<Output = Result<T>> + Send + 'static,
84 T: Send + 'static,
85 {
86 self.0
87 .task_tree
88 .spawn_tokio(name.into(), f, Some(self.0.id))
89 .await
90 }
91
92 /// Run a synchronous closure as a child task on the current thread.
93 ///
94 /// The closure runs inline and blocks the current thread until it returns.
95 /// Good for cheap synchronous work. For CPU-heavy or blocking I/O work,
96 /// use [`spawn_blocking`](Self::spawn_blocking) instead to avoid starving
97 /// the async executor.
98 pub fn spawn_sync<F, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
99 where
100 F: FnOnce(Task) -> Result<T>,
101 T: Send,
102 {
103 self.0.task_tree.spawn_sync(name.into(), f, Some(self.0.id))
104 }
105
106 /// Run a synchronous closure as a child task on **tokio's blocking thread
107 /// pool** (`tokio::task::spawn_blocking`).
108 ///
109 /// Use this for CPU-heavy computation or blocking I/O that would otherwise
110 /// stall the async executor thread. The closure runs on a dedicated OS
111 /// thread, and the returned future resolves once it completes.
112 ///
113 /// The closure must be `'static` because it is moved to a separate thread.
114 ///
115 /// If the blocking task panics, the ll task is marked as failed and the
116 /// panic is returned as an `anyhow::Error`.
117 pub async fn spawn_blocking<F, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
118 where
119 F: FnOnce(Task) -> Result<T> + Send + 'static,
120 T: Send + 'static,
121 {
122 self.0
123 .task_tree
124 .spawn_blocking(name.into(), f, Some(self.0.id))
125 .await
126 }
127
128 pub fn data<D: Into<DataValue>>(&self, name: &str, data: D) {
129 self.0.task_tree.add_data(self.0.id, name, data);
130 }
131
132 /// Get a piece of previously set data or transitive data. This can be
133 /// useful if session/request tracking IDs need to be past to other loggers,
134 /// e.g. when shelling out to another process that needs to set the same
135 /// `session_id` inside so we can group the events together.
136 pub fn get_data(&self, name: &str) -> Option<DataValue> {
137 self.0.task_tree.get_data(self.0.id, name)
138 }
139
140 pub fn data_transitive<D: Into<DataValue>>(&self, name: &str, data: D) {
141 self.0
142 .task_tree
143 .add_data_transitive_for_task(self.0.id, name, data);
144 }
145
146 pub fn progress(&self, done: i64, total: i64) {
147 self.0.task_tree.task_progress(self.0.id, done, total);
148 }
149
150 /// Reporters can use this flag to choose to not report errors.
151 /// This is useful for cases where there's a large task chain and every
152 /// single task reports a partial errors (that gets built up with each task)
153 /// It would make sense to report it only once at the top level (thrift
154 /// request, cli call, etc) and only mark other tasks.
155 /// If set to Some, the message inside is what would be reported by default
156 /// instead of reporting errors to avoid confusion (e.g. "error was hidden,
157 /// see ...")
158 /// see [hide_errors_default_msg()](crate::task_tree::TaskTree::hide_errors_default_msg)
159 pub fn hide_error_msg(&self, msg: Option<String>) {
160 let msg = msg.map(Arc::new);
161 self.0.task_tree.hide_error_msg_for_task(self.0.id, msg);
162 }
163
164 /// When errors occur, we attach task data to it in the description.
165 /// If set to false, only task direct data will be attached and not
166 /// transitive data. This is useful sometimes to remove the noise of
167 /// transitive data appearing in every error in the chain (e.g. hostname)
168 /// see [attach_transitive_data_to_errors_default()](crate::task_tree::TaskTree::attach_transitive_data_to_errors_default)
169 pub fn attach_transitive_data_to_errors(&self, val: bool) {
170 self.0
171 .task_tree
172 .attach_transitive_data_to_errors_for_task(self.0.id, val);
173 }
174}
175
176impl Drop for TaskData {
177 fn drop(&mut self) {
178 if self.mark_done_on_drop {
179 self.task_tree.mark_done(self.id, None);
180 }
181 }
182}