daimon_core/
distributed.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::Result;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct AgentTask {
21 pub task_id: String,
23 pub input: String,
25 pub run_id: Option<String>,
27 pub metadata: HashMap<String, serde_json::Value>,
29}
30
31impl AgentTask {
32 pub fn new(input: impl Into<String>) -> Self {
34 Self {
35 task_id: Self::generate_id(),
36 input: input.into(),
37 run_id: None,
38 metadata: HashMap::new(),
39 }
40 }
41
42 pub fn with_run_id(mut self, run_id: impl Into<String>) -> Self {
44 self.run_id = Some(run_id.into());
45 self
46 }
47
48 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
50 self.metadata.insert(key.into(), value);
51 self
52 }
53
54 fn generate_id() -> String {
55 use std::time::{SystemTime, UNIX_EPOCH};
56 let ts = SystemTime::now()
57 .duration_since(UNIX_EPOCH)
58 .unwrap_or_default()
59 .as_nanos();
60 format!("task-{ts:x}")
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct TaskResult {
67 pub task_id: String,
69 pub output: String,
71 pub iterations: usize,
73 pub cost: f64,
75 pub error: Option<String>,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum TaskStatus {
82 Pending,
84 Running,
86 Completed(TaskResult),
88 Failed(String),
90}
91
92pub trait TaskBroker: Send + Sync {
98 fn submit(&self, task: AgentTask) -> impl Future<Output = Result<String>> + Send;
100
101 fn status(&self, task_id: &str) -> impl Future<Output = Result<TaskStatus>> + Send;
103
104 fn receive(&self) -> impl Future<Output = Result<Option<AgentTask>>> + Send;
107
108 fn complete(&self, task_id: &str, result: TaskResult) -> impl Future<Output = Result<()>> + Send;
110
111 fn fail(&self, task_id: &str, error: String) -> impl Future<Output = Result<()>> + Send;
113}
114
115pub trait ErasedTaskBroker: Send + Sync {
117 fn submit_erased<'a>(
118 &'a self,
119 task: AgentTask,
120 ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'a>>;
121
122 fn status_erased<'a>(
123 &'a self,
124 task_id: &'a str,
125 ) -> Pin<Box<dyn Future<Output = Result<TaskStatus>> + Send + 'a>>;
126
127 fn receive_erased(&self) -> Pin<Box<dyn Future<Output = Result<Option<AgentTask>>> + Send + '_>>;
128
129 fn complete_erased<'a>(
130 &'a self,
131 task_id: &'a str,
132 result: TaskResult,
133 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
134
135 fn fail_erased<'a>(
136 &'a self,
137 task_id: &'a str,
138 error: String,
139 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
140}
141
142impl<T: TaskBroker> ErasedTaskBroker for T {
143 fn submit_erased<'a>(
144 &'a self,
145 task: AgentTask,
146 ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + 'a>> {
147 Box::pin(self.submit(task))
148 }
149
150 fn status_erased<'a>(
151 &'a self,
152 task_id: &'a str,
153 ) -> Pin<Box<dyn Future<Output = Result<TaskStatus>> + Send + 'a>> {
154 Box::pin(self.status(task_id))
155 }
156
157 fn receive_erased(&self) -> Pin<Box<dyn Future<Output = Result<Option<AgentTask>>> + Send + '_>> {
158 Box::pin(self.receive())
159 }
160
161 fn complete_erased<'a>(
162 &'a self,
163 task_id: &'a str,
164 result: TaskResult,
165 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
166 Box::pin(self.complete(task_id, result))
167 }
168
169 fn fail_erased<'a>(
170 &'a self,
171 task_id: &'a str,
172 error: String,
173 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
174 Box::pin(self.fail(task_id, error))
175 }
176}