dasein_agentic_core/distributed/
executor.rs1use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashSet;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17use dasein_agentic_llm::{
18 AnthropicAdapter, GeminiAdapter, LLMAdapter, LLMError, LLMMessage, OpenAIAdapter,
19};
20
21use super::config::{Capability, ExecutorConfig, LLMConfig, SandboxConfig};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub enum ExecutorState {
26 Idle,
28 Busy {
30 task_id: String,
31 started_at: DateTime<Utc>,
32 },
33 Borrowed {
35 to_supervisor: String,
36 lease_id: String,
37 expires_at: DateTime<Utc>,
38 },
39 Stopped,
41}
42
43#[derive(Debug)]
45pub struct Executor {
46 pub id: String,
48 pub owner: String,
50 pub current_supervisor: String,
52 state: Arc<RwLock<ExecutorState>>,
54 llm_config: LLMConfig,
56 capabilities: HashSet<Capability>,
58 tasks_completed: Arc<RwLock<u64>>,
60 tasks_failed: Arc<RwLock<u64>>,
61}
62
63impl Executor {
64 pub fn new(id: impl Into<String>, owner: impl Into<String>) -> ExecutorBuilder {
74 ExecutorBuilder::new(id.into(), owner.into())
75 }
76
77 pub fn from_config(config: ExecutorConfig) -> Self {
79 Self {
80 id: config.id,
81 owner: config.owner_supervisor.clone(),
82 current_supervisor: config.owner_supervisor,
83 state: Arc::new(RwLock::new(ExecutorState::Idle)),
84 llm_config: config.llm,
85 capabilities: config.capabilities,
86 tasks_completed: Arc::new(RwLock::new(0)),
87 tasks_failed: Arc::new(RwLock::new(0)),
88 }
89 }
90
91 pub async fn state(&self) -> ExecutorState {
93 self.state.read().await.clone()
94 }
95
96 pub async fn is_idle(&self) -> bool {
98 matches!(*self.state.read().await, ExecutorState::Idle)
99 }
100
101 pub async fn is_borrowed(&self) -> bool {
103 matches!(*self.state.read().await, ExecutorState::Borrowed { .. })
104 }
105
106 pub async fn set_busy(&self, task_id: String) {
108 *self.state.write().await = ExecutorState::Busy {
109 task_id,
110 started_at: Utc::now(),
111 };
112 }
113
114 pub async fn set_idle(&self) {
116 *self.state.write().await = ExecutorState::Idle;
117 }
118
119 pub async fn set_borrowed(
121 &self,
122 to_supervisor: String,
123 lease_id: String,
124 expires_at: DateTime<Utc>,
125 ) {
126 *self.state.write().await = ExecutorState::Borrowed {
129 to_supervisor,
130 lease_id,
131 expires_at,
132 };
133 }
134
135 pub async fn return_to_owner(&mut self) {
137 self.current_supervisor = self.owner.clone();
138 *self.state.write().await = ExecutorState::Idle;
139 }
140
141 pub async fn increment_completed(&self) {
143 *self.tasks_completed.write().await += 1;
144 }
145
146 pub async fn increment_failed(&self) {
148 *self.tasks_failed.write().await += 1;
149 }
150
151 pub fn capabilities(&self) -> &HashSet<Capability> {
153 &self.capabilities
154 }
155
156 pub fn has_capability(&self, cap: Capability) -> bool {
158 self.capabilities.contains(&cap)
159 }
160
161 pub fn llm_config(&self) -> &LLMConfig {
163 &self.llm_config
164 }
165
166 pub async fn execute(
179 &self,
180 system_prompt: &str,
181 user_prompt: &str,
182 ) -> Result<ExecutionResult, ExecutionError> {
183 let task_id = format!("task-{}", uuid::Uuid::new_v4());
184
185 self.set_busy(task_id.clone()).await;
187
188 let start = std::time::Instant::now();
189
190 let result = match self.llm_config.provider.as_str() {
192 "gemini" => {
193 let adapter = GeminiAdapter::new(&self.llm_config.api_key, &self.llm_config.model)
194 .with_temperature(self.llm_config.temperature)
195 .with_max_tokens(self.llm_config.max_tokens);
196
197 let messages = vec![
198 LLMMessage::system(system_prompt),
199 LLMMessage::user(user_prompt),
200 ];
201
202 adapter.generate(&messages).await
203 }
204 "openai" => {
205 let adapter = OpenAIAdapter::new(&self.llm_config.api_key, &self.llm_config.model)
206 .with_temperature(self.llm_config.temperature)
207 .with_max_tokens(self.llm_config.max_tokens);
208
209 let messages = vec![
210 LLMMessage::system(system_prompt),
211 LLMMessage::user(user_prompt),
212 ];
213
214 adapter.generate(&messages).await
215 }
216 "anthropic" => {
217 let adapter =
218 AnthropicAdapter::new(&self.llm_config.api_key, &self.llm_config.model)
219 .with_temperature(self.llm_config.temperature)
220 .with_max_tokens(self.llm_config.max_tokens);
221
222 let messages = vec![
223 LLMMessage::system(system_prompt),
224 LLMMessage::user(user_prompt),
225 ];
226
227 adapter.generate(&messages).await
228 }
229 other => {
230 self.set_idle().await;
231 return Err(ExecutionError::UnsupportedProvider(other.to_string()));
232 }
233 };
234
235 let duration = start.elapsed();
236
237 match result {
238 Ok(response) => {
239 self.increment_completed().await;
240 self.set_idle().await;
241
242 let truncated = response.finish_reason == dasein_agentic_llm::FinishReason::Length;
244 if truncated {
245 tracing::warn!(
246 "LLM output truncated (hit max_tokens limit). Model: {}, tokens: {}",
247 response.model,
248 response.tokens_used.total
249 );
250 }
251
252 Ok(ExecutionResult {
253 task_id,
254 executor_id: self.id.clone(),
255 content: response.content,
256 tokens_used: response.tokens_used.total,
257 duration_ms: duration.as_millis() as u64,
258 model: response.model,
259 truncated,
260 })
261 }
262 Err(e) => {
263 self.increment_failed().await;
264 self.set_idle().await;
265
266 Err(ExecutionError::LLMError(e))
267 }
268 }
269 }
270}
271
272#[derive(Debug, Clone)]
274pub struct ExecutionResult {
275 pub task_id: String,
277 pub executor_id: String,
279 pub content: String,
281 pub tokens_used: u32,
283 pub duration_ms: u64,
285 pub model: String,
287 pub truncated: bool,
289}
290
291impl ExecutionResult {
292 pub fn is_truncated(&self) -> bool {
294 self.truncated
295 }
296}
297
298#[derive(Debug, thiserror::Error)]
300pub enum ExecutionError {
301 #[error("LLM error: {0}")]
302 LLMError(#[from] LLMError),
303
304 #[error("Unsupported provider: {0}")]
305 UnsupportedProvider(String),
306
307 #[error("Executor not idle")]
308 NotIdle,
309}
310
311pub struct ExecutorBuilder {
313 id: String,
314 owner: String,
315 llm: Option<LLMConfig>,
316 sandbox: Option<SandboxConfig>,
317 capabilities: HashSet<Capability>,
318}
319
320impl ExecutorBuilder {
321 fn new(id: String, owner: String) -> Self {
322 Self {
323 id,
324 owner,
325 llm: None,
326 sandbox: None,
327 capabilities: HashSet::from([Capability::CodeGeneration]),
328 }
329 }
330
331 pub fn llm(mut self, config: LLMConfig) -> Self {
333 self.llm = Some(config);
334 self
335 }
336
337 pub fn llm_gemini(self, model: &str) -> Self {
339 self.llm(LLMConfig::gemini(model))
340 }
341
342 pub fn llm_openai(self, model: &str) -> Self {
344 self.llm(LLMConfig::openai(model))
345 }
346
347 pub fn llm_anthropic(self, model: &str) -> Self {
349 self.llm(LLMConfig::anthropic(model))
350 }
351
352 pub fn sandbox(mut self, config: SandboxConfig) -> Self {
354 self.sandbox = Some(config);
355 self
356 }
357
358 pub fn sandbox_process(self) -> Self {
360 self.sandbox(SandboxConfig::process())
361 }
362
363 pub fn sandbox_docker(self) -> Self {
365 self.sandbox(SandboxConfig::docker())
366 }
367
368 pub fn no_sandbox(self) -> Self {
370 self.sandbox(SandboxConfig::none())
371 }
372
373 pub fn capability(mut self, cap: Capability) -> Self {
375 self.capabilities.insert(cap);
376 self
377 }
378
379 pub fn capabilities(mut self, caps: impl IntoIterator<Item = Capability>) -> Self {
381 self.capabilities = caps.into_iter().collect();
382 self
383 }
384
385 pub fn build(self) -> Executor {
387 Executor::from_config(ExecutorConfig {
388 id: self.id,
389 owner_supervisor: self.owner,
390 llm: self
391 .llm
392 .unwrap_or_else(|| LLMConfig::gemini("gemini-2.0-flash")),
393 sandbox: self.sandbox.unwrap_or_else(SandboxConfig::process),
394 capabilities: self.capabilities,
395 })
396 }
397}
398
399pub fn generate_executor_id(supervisor_id: &str, index: usize) -> String {
401 format!("exe-{}-{:03}", supervisor_id, index)
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 #[test]
409 fn test_executor_builder() {
410 let exe = Executor::new("exe-001", "sup-001")
411 .llm_gemini("gemini-2.0-flash")
412 .sandbox_process()
413 .capability(Capability::CodeExecution)
414 .build();
415
416 assert_eq!(exe.id, "exe-001");
417 assert_eq!(exe.owner, "sup-001");
418 assert!(exe.has_capability(Capability::CodeGeneration));
419 assert!(exe.has_capability(Capability::CodeExecution));
420 }
421
422 #[tokio::test]
423 async fn test_executor_state() {
424 let exe = Executor::new("exe-001", "sup-001").build();
425
426 assert!(exe.is_idle().await);
427
428 exe.set_busy("task-123".into()).await;
429 assert!(!exe.is_idle().await);
430
431 exe.set_idle().await;
432 assert!(exe.is_idle().await);
433 }
434}