Skip to main content

ll/
task.rs

1use crate::data::DataValue;
2use crate::task_tree::{TaskTree, TASK_TREE};
3use crate::uniq_id::UniqID;
4use anyhow::Result;
5use std::future::Future;
6use std::sync::Arc;
7
8pub type MarkDoneOnDrop = bool;
9
10#[derive(Clone)]
11pub struct Task(pub(crate) Arc<TaskData>);
12
13pub(crate) struct TaskData {
14    pub(crate) id: UniqID,
15    pub(crate) task_tree: Arc<TaskTree>,
16    pub(crate) mark_done_on_drop: MarkDoneOnDrop,
17}
18
19impl Task {
20    pub fn create_new(name: &str) -> Self {
21        let id = TASK_TREE.create_task_internal(name, None);
22        Self(Arc::new(TaskData {
23            id,
24            task_tree: TASK_TREE.clone(),
25            mark_done_on_drop: true,
26        }))
27    }
28
29    pub fn create(&self, name: &str) -> Self {
30        let id = self.0.task_tree.create_task_internal(name, Some(self.0.id));
31        Self(Arc::new(TaskData {
32            id,
33            task_tree: self.0.task_tree.clone(),
34            mark_done_on_drop: true,
35        }))
36    }
37
38    /// Spawn a new top level task, with no parent.
39    /// This should usually be done in the very beginning of
40    /// the process/application.
41    pub async fn spawn_new<F, FT, T>(name: &str, f: F) -> Result<T>
42    where
43        F: FnOnce(Task) -> FT,
44        FT: Future<Output = Result<T>> + Send,
45        T: Send,
46    {
47        TASK_TREE.spawn(name.into(), f, None).await
48    }
49
50    /// Run an async closure as a child task on the current async thread.
51    ///
52    /// The future runs inline — it is `.await`ed directly without creating
53    /// a new tokio task. This means it shares the current tokio task and will
54    /// not make progress unless the caller awaits the returned future.
55    pub async fn spawn<F, FT, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
56    where
57        F: FnOnce(Task) -> FT,
58        FT: Future<Output = Result<T>> + Send,
59        T: Send,
60    {
61        self.0
62            .task_tree
63            .spawn(name.into(), f, Some(self.0.id))
64            .await
65    }
66
67    /// Run an async closure as a child task on a **new tokio task**
68    /// (`tokio::spawn`).
69    ///
70    /// Unlike [`spawn`](Self::spawn), the future runs concurrently on the
71    /// tokio runtime — it does not block the caller's async task. Use this
72    /// when you need true parallelism across multiple async operations without
73    /// manually calling `tokio::spawn` and cloning the parent task handle.
74    ///
75    /// The closure and its return type must be `'static` because they are
76    /// moved into a detached tokio task.
77    ///
78    /// If the spawned task panics, the ll task is marked as failed and the
79    /// panic is returned as an `anyhow::Error`.
80    pub async fn spawn_tokio<F, FT, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
81    where
82        F: FnOnce(Task) -> FT + Send + 'static,
83        FT: Future<Output = Result<T>> + Send + 'static,
84        T: Send + 'static,
85    {
86        self.0
87            .task_tree
88            .spawn_tokio(name.into(), f, Some(self.0.id))
89            .await
90    }
91
92    /// Run a synchronous closure as a child task on the current thread.
93    ///
94    /// The closure runs inline and blocks the current thread until it returns.
95    /// Good for cheap synchronous work. For CPU-heavy or blocking I/O work,
96    /// use [`spawn_blocking`](Self::spawn_blocking) instead to avoid starving
97    /// the async executor.
98    pub fn spawn_sync<F, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
99    where
100        F: FnOnce(Task) -> Result<T>,
101        T: Send,
102    {
103        self.0.task_tree.spawn_sync(name.into(), f, Some(self.0.id))
104    }
105
106    /// Run a synchronous closure as a child task on **tokio's blocking thread
107    /// pool** (`tokio::task::spawn_blocking`).
108    ///
109    /// Use this for CPU-heavy computation or blocking I/O that would otherwise
110    /// stall the async executor thread. The closure runs on a dedicated OS
111    /// thread, and the returned future resolves once it completes.
112    ///
113    /// The closure must be `'static` because it is moved to a separate thread.
114    ///
115    /// If the blocking task panics, the ll task is marked as failed and the
116    /// panic is returned as an `anyhow::Error`.
117    pub async fn spawn_blocking<F, T, S: Into<String>>(&self, name: S, f: F) -> Result<T>
118    where
119        F: FnOnce(Task) -> Result<T> + Send + 'static,
120        T: Send + 'static,
121    {
122        self.0
123            .task_tree
124            .spawn_blocking(name.into(), f, Some(self.0.id))
125            .await
126    }
127
128    pub fn data<D: Into<DataValue>>(&self, name: &str, data: D) {
129        self.0.task_tree.add_data(self.0.id, name, data);
130    }
131
132    /// Get a piece of previously set data or transitive data. This can be
133    /// useful if session/request tracking IDs need to be past to other loggers,
134    /// e.g. when shelling out to another process that needs to set the same
135    /// `session_id` inside so we can group the events together.
136    pub fn get_data(&self, name: &str) -> Option<DataValue> {
137        self.0.task_tree.get_data(self.0.id, name)
138    }
139
140    pub fn data_transitive<D: Into<DataValue>>(&self, name: &str, data: D) {
141        self.0
142            .task_tree
143            .add_data_transitive_for_task(self.0.id, name, data);
144    }
145
146    pub fn progress(&self, done: i64, total: i64) {
147        self.0.task_tree.task_progress(self.0.id, done, total);
148    }
149
150    /// Reporters can use this flag to choose to not report errors.
151    /// This is useful for cases where there's a large task chain and every
152    /// single task reports a partial errors (that gets built up with each task)
153    /// It would make sense to report it only once at the top level (thrift
154    /// request, cli call, etc) and only mark other tasks.
155    /// If set to Some, the message inside is what would be reported by default
156    /// instead of reporting errors to avoid confusion (e.g. "error was hidden,
157    /// see ...")
158    /// see [hide_errors_default_msg()](crate::task_tree::TaskTree::hide_errors_default_msg)
159    pub fn hide_error_msg(&self, msg: Option<String>) {
160        let msg = msg.map(Arc::new);
161        self.0.task_tree.hide_error_msg_for_task(self.0.id, msg);
162    }
163
164    /// When errors occur, we attach task data to it in the description.
165    /// If set to false, only task direct data will be attached and not
166    /// transitive data. This is useful sometimes to remove the noise of
167    /// transitive data appearing in every error in the chain (e.g. hostname)
168    /// see [attach_transitive_data_to_errors_default()](crate::task_tree::TaskTree::attach_transitive_data_to_errors_default)
169    pub fn attach_transitive_data_to_errors(&self, val: bool) {
170        self.0
171            .task_tree
172            .attach_transitive_data_to_errors_for_task(self.0.id, val);
173    }
174}
175
176impl Drop for TaskData {
177    fn drop(&mut self) {
178        if self.mark_done_on_drop {
179            self.task_tree.mark_done(self.id, None);
180        }
181    }
182}