Skip to main content

daimon_core/
distributed.rs

1//! Core types and trait for distributed agent execution.
2//!
3//! Provider crates implement [`TaskBroker`] for their cloud-native message
4//! service. The main `daimon` crate re-exports everything from here.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::Result;
13
14/// A unit of work submitted to a [`TaskBroker`].
15///
16/// Each task carries a unique ID and the input text for an agent prompt.
17/// Optional metadata lets callers tag tasks with routing hints, priority,
18/// or any application-specific data.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct AgentTask {
21    /// Unique identifier for this task (generated on creation).
22    pub task_id: String,
23    /// The user input to prompt the agent with.
24    pub input: String,
25    /// Optional run ID for resumable execution via checkpoints.
26    pub run_id: Option<String>,
27    /// Arbitrary key-value metadata (routing hints, priority, etc.).
28    pub metadata: HashMap<String, serde_json::Value>,
29}
30
31impl AgentTask {
32    /// Creates a new task with a timestamp-based ID.
33    pub fn new(input: impl Into<String>) -> Self {
34        Self {
35            task_id: Self::generate_id(),
36            input: input.into(),
37            run_id: None,
38            metadata: HashMap::new(),
39        }
40    }
41
42    /// Assigns a checkpoint run ID for resumable execution.
43    pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
44        self.run_id = Some(run_id.into());
45        self
46    }
47
48    /// Adds a metadata key-value pair.
49    pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
50        self.metadata.insert(key.into(), value);
51        self
52    }
53
54    fn generate_id() -> String {
55        use std::time::{SystemTime, UNIX_EPOCH};
56        let ts = SystemTime::now()
57            .duration_since(UNIX_EPOCH)
58            .unwrap_or_default()
59            .as_nanos();
60        format!("task-{ts:x}")
61    }
62}
63
64/// The result of a completed agent task.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TaskResult {
67    /// The task ID this result corresponds to.
68    pub task_id: String,
69    /// The agent's final text output.
70    pub output: String,
71    /// Number of ReAct iterations the agent performed.
72    pub iterations: usize,
73    /// Estimated cost in USD (if a cost model was configured).
74    pub cost: f64,
75    /// Error message if the task failed.
76    pub error: Option<String>,
77}
78
79/// Current status of a distributed task.
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum TaskStatus {
82    /// Submitted but not yet picked up by a worker.
83    Pending,
84    /// Currently being executed by a worker.
85    Running,
86    /// Completed successfully.
87    Completed(TaskResult),
88    /// Failed with an error.
89    Failed(String),
90}
91
92/// Trait for distributing agent tasks across workers.
93///
94/// Implement this for your message broker (AWS SQS, Google Pub/Sub,
95/// Azure Service Bus, Redis, NATS, RabbitMQ, etc.) to enable
96/// multi-process agent execution.
97pub trait TaskBroker: Send + Sync {
98    /// Submits a task for execution. Returns the task ID.
99    fn submit(&self, task: AgentTask) -> impl Future<Output = Result<String>> + Send;
100
101    /// Queries the current status of a task.
102    fn status(&self, task_id: &str) -> impl Future<Output = Result<TaskStatus>> + Send;
103
104    /// Blocks until a task is available and returns it.
105    /// Returns `None` if the broker is closed.
106    fn receive(&self) -> impl Future<Output = Result<Option<AgentTask>>> + Send;
107
108    /// Marks a task as completed with the given result.
109    fn complete(&self, task_id: &str, result: TaskResult) -> impl Future<Output = Result<()>> + Send;
110
111    /// Marks a task as failed with an error message.
112    fn fail(&self, task_id: &str, error: String) -> impl Future<Output = Result<()>> + Send;
113}
114
115/// Object-safe wrapper for [`TaskBroker`], enabling `Arc<dyn ErasedTaskBroker>`.
116pub trait ErasedTaskBroker: Send + Sync {
117    fn submit_erased<'a>(
118        &'a self,
119        task: AgentTask,
120    ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'a>>;
121
122    fn status_erased<'a>(
123        &'a self,
124        task_id: &'a str,
125    ) -> Pin<Box<dyn Future<Output = Result<TaskStatus>> + Send + 'a>>;
126
127    fn receive_erased(&self) -> Pin<Box<dyn Future<Output = Result<Option<AgentTask>>> + Send + '_>>;
128
129    fn complete_erased<'a>(
130        &'a self,
131        task_id: &'a str,
132        result: TaskResult,
133    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
134
135    fn fail_erased<'a>(
136        &'a self,
137        task_id: &'a str,
138        error: String,
139    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
140}
141
142impl<T: TaskBroker> ErasedTaskBroker for T {
143    fn submit_erased<'a>(
144        &'a self,
145        task: AgentTask,
146    ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'a>> {
147        Box::pin(self.submit(task))
148    }
149
150    fn status_erased<'a>(
151        &'a self,
152        task_id: &'a str,
153    ) -> Pin<Box<dyn Future<Output = Result<TaskStatus>> + Send + 'a>> {
154        Box::pin(self.status(task_id))
155    }
156
157    fn receive_erased(&self) -> Pin<Box<dyn Future<Output = Result<Option<AgentTask>>> + Send + '_>> {
158        Box::pin(self.receive())
159    }
160
161    fn complete_erased<'a>(
162        &'a self,
163        task_id: &'a str,
164        result: TaskResult,
165    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
166        Box::pin(self.complete(task_id, result))
167    }
168
169    fn fail_erased<'a>(
170        &'a self,
171        task_id: &'a str,
172        error: String,
173    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
174        Box::pin(self.fail(task_id, error))
175    }
176}