Skip to main content

dasein_agentic_core/distributed/
executor.rs

1//! Executor - Worker that processes tasks.
2//!
3//! # Quick Start
4//!
5//! ```rust,ignore
6//! let executor = Executor::new("exe-001", "sup-001")
7//!     .llm(LLMConfig::gemini("gemini-2.0-flash"))
8//!     .build();
9//! ```
10
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashSet;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17use dasein_agentic_llm::{
18    AnthropicAdapter, GeminiAdapter, LLMAdapter, LLMError, LLMMessage, OpenAIAdapter,
19};
20
21use super::config::{Capability, ExecutorConfig, LLMConfig, SandboxConfig};
22
23/// Current state of an executor.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub enum ExecutorState {
26    /// Ready for tasks
27    Idle,
28    /// Processing a task
29    Busy {
30        task_id: String,
31        started_at: DateTime<Utc>,
32    },
33    /// Lent to another supervisor
34    Borrowed {
35        to_supervisor: String,
36        lease_id: String,
37        expires_at: DateTime<Utc>,
38    },
39    /// Stopped
40    Stopped,
41}
42
43/// An executor that processes tasks.
44#[derive(Debug)]
45pub struct Executor {
46    /// Unique identifier
47    pub id: String,
48    /// Owner supervisor
49    pub owner: String,
50    /// Current controlling supervisor (may differ if borrowed)
51    pub current_supervisor: String,
52    /// Current state
53    state: Arc<RwLock<ExecutorState>>,
54    /// LLM configuration
55    llm_config: LLMConfig,
56    /// Capabilities
57    capabilities: HashSet<Capability>,
58    /// Metrics
59    tasks_completed: Arc<RwLock<u64>>,
60    tasks_failed: Arc<RwLock<u64>>,
61}
62
63impl Executor {
64    /// Create a new executor with minimal config.
65    ///
66    /// # Example
67    ///
68    /// ```rust,ignore
69    /// let exe = Executor::new("exe-001", "sup-001")
70    ///     .llm(LLMConfig::gemini("gemini-2.0-flash"))
71    ///     .build();
72    /// ```
73    pub fn new(id: impl Into<String>, owner: impl Into<String>) -> ExecutorBuilder {
74        ExecutorBuilder::new(id.into(), owner.into())
75    }
76
77    /// Create from full config.
78    pub fn from_config(config: ExecutorConfig) -> Self {
79        Self {
80            id: config.id,
81            owner: config.owner_supervisor.clone(),
82            current_supervisor: config.owner_supervisor,
83            state: Arc::new(RwLock::new(ExecutorState::Idle)),
84            llm_config: config.llm,
85            capabilities: config.capabilities,
86            tasks_completed: Arc::new(RwLock::new(0)),
87            tasks_failed: Arc::new(RwLock::new(0)),
88        }
89    }
90
91    /// Get current state.
92    pub async fn state(&self) -> ExecutorState {
93        self.state.read().await.clone()
94    }
95
96    /// Check if idle.
97    pub async fn is_idle(&self) -> bool {
98        matches!(*self.state.read().await, ExecutorState::Idle)
99    }
100
101    /// Check if borrowed.
102    pub async fn is_borrowed(&self) -> bool {
103        matches!(*self.state.read().await, ExecutorState::Borrowed { .. })
104    }
105
106    /// Set state to busy.
107    pub async fn set_busy(&self, task_id: String) {
108        *self.state.write().await = ExecutorState::Busy {
109            task_id,
110            started_at: Utc::now(),
111        };
112    }
113
114    /// Set state to idle.
115    pub async fn set_idle(&self) {
116        *self.state.write().await = ExecutorState::Idle;
117    }
118
119    /// Set state to borrowed.
120    pub async fn set_borrowed(
121        &self,
122        to_supervisor: String,
123        lease_id: String,
124        expires_at: DateTime<Utc>,
125    ) {
126        // Note: current_supervisor tracking would need interior mutability
127        // For now, we just update the state
128        *self.state.write().await = ExecutorState::Borrowed {
129            to_supervisor,
130            lease_id,
131            expires_at,
132        };
133    }
134
135    /// Return to owner.
136    pub async fn return_to_owner(&mut self) {
137        self.current_supervisor = self.owner.clone();
138        *self.state.write().await = ExecutorState::Idle;
139    }
140
141    /// Increment completed tasks.
142    pub async fn increment_completed(&self) {
143        *self.tasks_completed.write().await += 1;
144    }
145
146    /// Increment failed tasks.
147    pub async fn increment_failed(&self) {
148        *self.tasks_failed.write().await += 1;
149    }
150
151    /// Get capabilities.
152    pub fn capabilities(&self) -> &HashSet<Capability> {
153        &self.capabilities
154    }
155
156    /// Check if has capability.
157    pub fn has_capability(&self, cap: Capability) -> bool {
158        self.capabilities.contains(&cap)
159    }
160
161    /// Get LLM config.
162    pub fn llm_config(&self) -> &LLMConfig {
163        &self.llm_config
164    }
165
166    /// Execute a task with the LLM.
167    ///
168    /// This makes a real API call to the configured LLM provider.
169    ///
170    /// # Example
171    ///
172    /// ```rust,ignore
173    /// let result = executor.execute(
174    ///     "You are a Rust expert.",
175    ///     "Write a function to check if a number is prime."
176    /// ).await?;
177    /// ```
178    pub async fn execute(
179        &self,
180        system_prompt: &str,
181        user_prompt: &str,
182    ) -> Result<ExecutionResult, ExecutionError> {
183        let task_id = format!("task-{}", uuid::Uuid::new_v4());
184
185        // Mark as busy
186        self.set_busy(task_id.clone()).await;
187
188        let start = std::time::Instant::now();
189
190        // Create LLM adapter based on config
191        let result = match self.llm_config.provider.as_str() {
192            "gemini" => {
193                let adapter = GeminiAdapter::new(&self.llm_config.api_key, &self.llm_config.model)
194                    .with_temperature(self.llm_config.temperature)
195                    .with_max_tokens(self.llm_config.max_tokens);
196
197                let messages = vec![
198                    LLMMessage::system(system_prompt),
199                    LLMMessage::user(user_prompt),
200                ];
201
202                adapter.generate(&messages).await
203            }
204            "openai" => {
205                let adapter = OpenAIAdapter::new(&self.llm_config.api_key, &self.llm_config.model)
206                    .with_temperature(self.llm_config.temperature)
207                    .with_max_tokens(self.llm_config.max_tokens);
208
209                let messages = vec![
210                    LLMMessage::system(system_prompt),
211                    LLMMessage::user(user_prompt),
212                ];
213
214                adapter.generate(&messages).await
215            }
216            "anthropic" => {
217                let adapter =
218                    AnthropicAdapter::new(&self.llm_config.api_key, &self.llm_config.model)
219                        .with_temperature(self.llm_config.temperature)
220                        .with_max_tokens(self.llm_config.max_tokens);
221
222                let messages = vec![
223                    LLMMessage::system(system_prompt),
224                    LLMMessage::user(user_prompt),
225                ];
226
227                adapter.generate(&messages).await
228            }
229            other => {
230                self.set_idle().await;
231                return Err(ExecutionError::UnsupportedProvider(other.to_string()));
232            }
233        };
234
235        let duration = start.elapsed();
236
237        match result {
238            Ok(response) => {
239                self.increment_completed().await;
240                self.set_idle().await;
241
242                // Check if response was truncated
243                let truncated = response.finish_reason == dasein_agentic_llm::FinishReason::Length;
244                if truncated {
245                    tracing::warn!(
246                        "LLM output truncated (hit max_tokens limit). Model: {}, tokens: {}",
247                        response.model,
248                        response.tokens_used.total
249                    );
250                }
251
252                Ok(ExecutionResult {
253                    task_id,
254                    executor_id: self.id.clone(),
255                    content: response.content,
256                    tokens_used: response.tokens_used.total,
257                    duration_ms: duration.as_millis() as u64,
258                    model: response.model,
259                    truncated,
260                })
261            }
262            Err(e) => {
263                self.increment_failed().await;
264                self.set_idle().await;
265
266                Err(ExecutionError::LLMError(e))
267            }
268        }
269    }
270}
271
272/// Result of an execution.
273#[derive(Debug, Clone)]
274pub struct ExecutionResult {
275    /// Task ID
276    pub task_id: String,
277    /// Executor that processed it
278    pub executor_id: String,
279    /// Generated content
280    pub content: String,
281    /// Tokens used
282    pub tokens_used: u32,
283    /// Duration in milliseconds
284    pub duration_ms: u64,
285    /// Model used
286    pub model: String,
287    /// Whether output was truncated (hit max tokens)
288    pub truncated: bool,
289}
290
291impl ExecutionResult {
292    /// Check if the response was truncated due to max tokens limit.
293    pub fn is_truncated(&self) -> bool {
294        self.truncated
295    }
296}
297
298/// Execution errors.
299#[derive(Debug, thiserror::Error)]
300pub enum ExecutionError {
301    #[error("LLM error: {0}")]
302    LLMError(#[from] LLMError),
303
304    #[error("Unsupported provider: {0}")]
305    UnsupportedProvider(String),
306
307    #[error("Executor not idle")]
308    NotIdle,
309}
310
311/// Builder for creating executors.
312pub struct ExecutorBuilder {
313    id: String,
314    owner: String,
315    llm: Option<LLMConfig>,
316    sandbox: Option<SandboxConfig>,
317    capabilities: HashSet<Capability>,
318}
319
320impl ExecutorBuilder {
321    fn new(id: String, owner: String) -> Self {
322        Self {
323            id,
324            owner,
325            llm: None,
326            sandbox: None,
327            capabilities: HashSet::from([Capability::CodeGeneration]),
328        }
329    }
330
331    /// Set LLM configuration.
332    pub fn llm(mut self, config: LLMConfig) -> Self {
333        self.llm = Some(config);
334        self
335    }
336
337    /// Use Gemini (shortcut).
338    pub fn llm_gemini(self, model: &str) -> Self {
339        self.llm(LLMConfig::gemini(model))
340    }
341
342    /// Use OpenAI (shortcut).
343    pub fn llm_openai(self, model: &str) -> Self {
344        self.llm(LLMConfig::openai(model))
345    }
346
347    /// Use Anthropic/Claude (shortcut).
348    pub fn llm_anthropic(self, model: &str) -> Self {
349        self.llm(LLMConfig::anthropic(model))
350    }
351
352    /// Set sandbox configuration.
353    pub fn sandbox(mut self, config: SandboxConfig) -> Self {
354        self.sandbox = Some(config);
355        self
356    }
357
358    /// Use process sandbox (shortcut).
359    pub fn sandbox_process(self) -> Self {
360        self.sandbox(SandboxConfig::process())
361    }
362
363    /// Use docker sandbox (shortcut).
364    pub fn sandbox_docker(self) -> Self {
365        self.sandbox(SandboxConfig::docker())
366    }
367
368    /// No sandbox (generation only).
369    pub fn no_sandbox(self) -> Self {
370        self.sandbox(SandboxConfig::none())
371    }
372
373    /// Add capability.
374    pub fn capability(mut self, cap: Capability) -> Self {
375        self.capabilities.insert(cap);
376        self
377    }
378
379    /// Set capabilities.
380    pub fn capabilities(mut self, caps: impl IntoIterator<Item = Capability>) -> Self {
381        self.capabilities = caps.into_iter().collect();
382        self
383    }
384
385    /// Build the executor.
386    pub fn build(self) -> Executor {
387        Executor::from_config(ExecutorConfig {
388            id: self.id,
389            owner_supervisor: self.owner,
390            llm: self
391                .llm
392                .unwrap_or_else(|| LLMConfig::gemini("gemini-2.0-flash")),
393            sandbox: self.sandbox.unwrap_or_else(SandboxConfig::process),
394            capabilities: self.capabilities,
395        })
396    }
397}
398
399/// Generate a unique executor ID.
400pub fn generate_executor_id(supervisor_id: &str, index: usize) -> String {
401    format!("exe-{}-{:03}", supervisor_id, index)
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    #[test]
409    fn test_executor_builder() {
410        let exe = Executor::new("exe-001", "sup-001")
411            .llm_gemini("gemini-2.0-flash")
412            .sandbox_process()
413            .capability(Capability::CodeExecution)
414            .build();
415
416        assert_eq!(exe.id, "exe-001");
417        assert_eq!(exe.owner, "sup-001");
418        assert!(exe.has_capability(Capability::CodeGeneration));
419        assert!(exe.has_capability(Capability::CodeExecution));
420    }
421
422    #[tokio::test]
423    async fn test_executor_state() {
424        let exe = Executor::new("exe-001", "sup-001").build();
425
426        assert!(exe.is_idle().await);
427
428        exe.set_busy("task-123".into()).await;
429        assert!(!exe.is_idle().await);
430
431        exe.set_idle().await;
432        assert!(exe.is_idle().await);
433    }
434}