vibe_code/
task.rs

1//! Defines the core `Task` abstraction and related components.
2//!
3//! This module provides the fundamental building blocks for defining and managing
4//! units of work within the system. It includes the `Task` struct itself, which
5//! wraps a user's function, and the `TaskHandle`, which is returned to the user
6//! to get the final result.
7
8use std::fmt;
9use std::panic::{AssertUnwindSafe, catch_unwind};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::mpsc;
12
13/// A unique identifier for a `Task`.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
15pub struct TaskId(u64);
16
17/// An atomic counter to generate unique task IDs.
18static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(1);
19
20impl TaskId {
21    /// Creates a new, unique `TaskId`.
22    pub fn new() -> Self {
23        TaskId(NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed))
24    }
25
26    /// Returns the inner `u64` value of the `TaskId`.
27    pub fn value(&self) -> u64 {
28        self.0
29    }
30}
31
32impl Default for TaskId {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl fmt::Display for TaskId {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        write!(f, "Task({})", self.0)
41    }
42}
43
44/// Defines the priority level of a task.
45///
46/// This can be used by the system to prioritize certain tasks over others,
47/// although it is not heavily used by the simple `VibeSystem` API.
48#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
49pub enum Priority {
50    Low,
51    #[default]
52    Normal,
53    High,
54}
55
56/// Represents errors that can occur during a task's execution.
57#[derive(Debug)]
58pub enum TaskError {
59    /// The task's function panicked.
60    Panicked(String),
61    /// The task's function returned an error.
62    ExecutionFailed(Box<dyn std::error::Error + Send + Sync + 'static>),
63    /// The task timed out.
64    TimedOut,
65}
66
67impl fmt::Display for TaskError {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        match self {
70            TaskError::Panicked(msg) => write!(f, "Task panicked: {msg}"),
71            TaskError::ExecutionFailed(err) => write!(f, "Task execution failed: {err}"),
72            TaskError::TimedOut => write!(f, "Operation timed out"),
73        }
74    }
75}
76
77impl std::error::Error for TaskError {
78    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
79        match self {
80            TaskError::ExecutionFailed(err) => Some(err.as_ref()),
81            _ => None,
82        }
83    }
84}
85
86/// Converts a panic payload into a readable string.
87fn panic_payload_to_string(payload: Box<dyn std::any::Any + Send>) -> String {
88    if let Some(s) = payload.downcast_ref::<&str>() {
89        s.to_string()
90    } else if let Some(s) = payload.downcast_ref::<String>() {
91        s.clone()
92    } else {
93        "Unknown panic payload type".to_string()
94    }
95}
96
97/// A handle to a task that has been submitted to the system.
98///
99/// This is the public-facing handle (wrapped by `Job`) that allows a user
100/// to retrieve the result of their function once it's complete.
101#[derive(Debug)]
102pub struct TaskHandle<Output> {
103    pub task_id: TaskId,
104    result_receiver: mpsc::Receiver<Result<Output, TaskError>>,
105}
106
107impl<Output> TaskHandle<Output> {
108    /// Creates a new `TaskHandle` linked to a result channel. (Internal use)
109    pub(crate) fn new(
110        task_id: TaskId,
111        result_receiver: mpsc::Receiver<Result<Output, TaskError>>,
112    ) -> Self {
113        Self {
114            task_id,
115            result_receiver,
116        }
117    }
118
119    /// Returns the unique ID of the associated task.
120    pub fn get_task_id(&self) -> TaskId {
121        self.task_id
122    }
123
124    /// Waits (blocks) until the task's result is available and returns it.
125    pub fn recv_result(&self) -> Result<Result<Output, TaskError>, mpsc::RecvError> {
126        self.result_receiver.recv()
127    }
128
129    /// Attempts to receive the result without blocking.
130    pub fn try_recv_result(&self) -> Result<Result<Output, TaskError>, mpsc::TryRecvError> {
131        self.result_receiver.try_recv()
132    }
133}
134
135/// An internal enum representing the outcome of a task's execution.
136///
137/// This is used by `VibeNode` workers for metrics and logging, distinguishing
138/// between a successful run, a logical failure, or a panic.
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub enum TaskExecutionOutcome {
141    /// The task completed successfully and the result was sent.
142    Success,
143    /// The task returned a `TaskError`.
144    LogicError,
145    /// The result could not be sent because the receiver was dropped.
146    ResultSendFailed,
147    /// The task's function panicked.
148    Panicked,
149}
150
151/// An internal representation of a unit of work.
152///
153/// This struct wraps the user's closure in a type-erased `Box<dyn FnOnce...>`,
154/// allowing different kinds of functions to be stored and executed by workers.
155pub struct Task {
156    pub id: TaskId,
157    pub priority: Priority,
158    pub estimated_cost: u32,
159    runnable: Box<dyn FnOnce() -> TaskExecutionOutcome + Send + 'static>,
160}
161
162impl Task {
163    /// Creates a new `Task` from a user-provided function.
164    ///
165    /// This wraps the function in a new closure that handles panic catching
166    /// and sending the result back over a channel.
167    pub fn new_for_cpu<F, Output>(
168        priority: Priority,
169        estimated_cost: u32,
170        work_fn: F,
171        result_tx: mpsc::Sender<Result<Output, TaskError>>,
172    ) -> Self
173    where
174        F: FnOnce() -> Result<Output, TaskError> + Send + 'static,
175        Output: Send + 'static,
176    {
177        let task_id = TaskId::new();
178
179        let runnable = Box::new(move || {
180            // Execute the user's function, catching any panics.
181            let task_result = match catch_unwind(AssertUnwindSafe(work_fn)) {
182                Ok(result) => result,
183                Err(panic_payload) => {
184                    Err(TaskError::Panicked(panic_payload_to_string(panic_payload)))
185                }
186            };
187
188            // Determine the internal outcome for metrics.
189            let outcome_before_send = match &task_result {
190                Ok(_) => TaskExecutionOutcome::Success,
191                Err(TaskError::Panicked(_)) => TaskExecutionOutcome::Panicked,
192                Err(TaskError::ExecutionFailed(_)) => TaskExecutionOutcome::LogicError,
193                Err(TaskError::TimedOut) => TaskExecutionOutcome::LogicError,
194            };
195
196            // Send the result back to the user.
197            match result_tx.send(task_result) {
198                Ok(_) => outcome_before_send,
199                Err(_) => TaskExecutionOutcome::ResultSendFailed,
200            }
201        });
202
203        Task {
204            id: task_id,
205            priority,
206            estimated_cost: estimated_cost.clamp(1, 100),
207            runnable,
208        }
209    }
210
211    /// Executes the wrapped user function.
212    ///
213    /// This is called by the `VibeNode` worker thread.
214    pub fn run(self) -> TaskExecutionOutcome {
215        (self.runnable)()
216    }
217}
218
219impl fmt::Debug for Task {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("Task")
222            .field("id", &self.id)
223            .field("priority", &self.priority)
224            .field("estimated_cost", &self.estimated_cost)
225            .field("runnable", &"<Box<dyn FnOnce() -> TaskExecutionOutcome>>")
226            .finish()
227    }
228}