mecha10_nodes_llm_command/
lib.rs

1//! LLM Command Node
2//!
3//! Natural language command parsing via LLM APIs (OpenAI, Claude, Ollama).
4//!
5//! # Topic Interface
6//!
7//! **Input:** `/ai/command` (CommandMessage)
8//! **Output:** `/ai/response` (ResponseMessage)
9//! **Output:** `/nav/goal` (NavigationGoal)
10//! **Output:** `/motor/cmd_vel` (MotorCommand)
11//! **Output:** `/behavior/execute` (BehaviorCommand)
12//!
13//! # Provider Agnosticism
14//!
15//! This node doesn't care about the LLM provider:
16//! - OpenAI API → Uses OpenAIProvider
17//! - Claude API → Uses ClaudeProvider
18//! - Local Ollama → Uses LocalProvider
19//!
20//! Same interface = same node works everywhere!
21
22mod config;
23
24pub use config::{OpenAIReasoningConfig, TopicConfig};
25
26use anyhow::{Context as AnyhowContext, Result};
27use mecha10_ai_llm::prelude::*;
28use mecha10_core::behavior_interrupt::BehaviorInterruptTrigger;
29use mecha10_core::health::HealthReportingExt;
30use mecha10_core::messages::{HealthStatus, Message};
31use mecha10_core::prelude::*;
32use mecha10_core::topics::Topic;
33use serde::{Deserialize, Serialize};
34use std::env;
35use tracing::{info, warn};
36
37// === Message Types ===
38
39/// Natural language command from user
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CommandMessage {
42    /// Natural language command text
43    pub text: String,
44    /// Timestamp of command
45    pub timestamp: u64,
46    /// Optional user ID
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub user_id: Option<String>,
49}
50
51impl Message for CommandMessage {}
52
53/// Response from LLM
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ResponseMessage {
56    /// Response text
57    pub text: String,
58    /// Timestamp of response
59    pub timestamp: u64,
60    /// Whether an action was extracted
61    pub action_taken: bool,
62    /// Optional error message
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub error: Option<String>,
65}
66
67impl Message for ResponseMessage {}
68
69/// Navigation goal
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct NavigationGoal {
72    pub x: f64,
73    pub y: f64,
74    #[serde(default)]
75    pub theta: f64,
76    pub timestamp: u64,
77}
78
79impl Message for NavigationGoal {}
80
81/// Motor velocity command
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MotorCommand {
84    /// Linear velocity (m/s)
85    pub linear: f64,
86    /// Angular velocity (rad/s)
87    pub angular: f64,
88    /// Optional duration in seconds
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub duration_secs: Option<f64>,
91    pub timestamp: u64,
92}
93
94impl Message for MotorCommand {}
95
96/// Behavior execution command
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct BehaviorCommand {
99    /// Behavior name to execute
100    pub name: String,
101    /// Optional parameters
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub params: Option<serde_json::Value>,
104    pub timestamp: u64,
105}
106
107impl Message for BehaviorCommand {}
108
109/// Object detection (from object-detector node)
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct Detection {
112    pub class_id: u32,
113    pub class_name: String,
114    pub confidence: f32,
115}
116
117/// Detection result from vision system
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct DetectionResult {
120    pub frame_id: u64,
121    pub timestamp: u64,
122    pub detections: Vec<Detection>,
123    pub inference_time_ms: f32,
124    pub model_name: String,
125}
126
127impl Message for DetectionResult {}
128
129/// Parsed action from LLM response
130#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(tag = "action", rename_all = "snake_case")]
132enum ParsedAction {
133    Navigate {
134        goal: NavigationGoalData,
135    },
136    Motor {
137        linear: f64,
138        angular: f64,
139        #[serde(skip_serializing_if = "Option::is_none")]
140        duration_secs: Option<f64>,
141    },
142    Behavior {
143        name: String,
144        #[serde(skip_serializing_if = "Option::is_none")]
145        params: Option<serde_json::Value>,
146    },
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150struct NavigationGoalData {
151    pub x: f64,
152    pub y: f64,
153    #[serde(default)]
154    pub theta: f64,
155}
156
157// === OpenAI Reasoning Node ===
158
159pub struct OpenAIReasoningNode {
160    config: OpenAIReasoningConfig,
161    llm: LlmNode,
162    latest_detections: Option<DetectionResult>,
163    behavior_trigger: BehaviorInterruptTrigger,
164}
165
166impl OpenAIReasoningNode {
167    pub fn new(config: OpenAIReasoningConfig) -> Result<Self> {
168        // Get API key from environment
169        let api_key = match config.provider.as_str() {
170            "openai" => env::var("OPENAI_API_KEY").context("OPENAI_API_KEY environment variable not set")?,
171            "claude" => env::var("ANTHROPIC_API_KEY").context("ANTHROPIC_API_KEY environment variable not set")?,
172            "local" => String::new(), // No API key needed for local
173            _ => anyhow::bail!("Unknown provider: {}", config.provider),
174        };
175
176        // Build LLM node
177        let mut builder = match config.provider.as_str() {
178            "openai" => LlmNode::openai(),
179            "claude" => LlmNode::claude(),
180            "local" => LlmNode::local(),
181            _ => unreachable!(),
182        };
183
184        builder = builder
185            .with_model(&config.llm_model)
186            .with_temperature(config.temperature)
187            .with_max_tokens(config.max_tokens);
188
189        if !api_key.is_empty() {
190            builder = builder.with_api_key(api_key);
191        }
192
193        if let Some(ref system_prompt) = config.system_prompt {
194            builder = builder.with_system_prompt(system_prompt);
195        }
196
197        let llm = builder.build()?;
198
199        // Create behavior interrupt trigger
200        let behavior_trigger = BehaviorInterruptTrigger::new("llm-command", config.behavior_interrupt.clone());
201
202        Ok(Self {
203            config,
204            llm,
205            latest_detections: None,
206            behavior_trigger,
207        })
208    }
209
210    /// Process a command and generate a response
211    async fn process_command(&mut self, ctx: &Context, command: &CommandMessage) -> Result<ResponseMessage> {
212        info!("🧠 Processing command: {}", command.text);
213
214        // Add user message to conversation
215        self.llm.clear_messages(); // Start fresh for each command
216
217        // Check if this is a vision query
218        let is_vision_query = self.is_vision_query(&command.text);
219
220        if is_vision_query && self.config.vision_enabled {
221            // Add detection context to the prompt
222            let prompt_with_context = if let Some(ref detections) = self.latest_detections {
223                let detection_text = self.format_detections(detections);
224                info!("đŸ‘ī¸ Vision query detected - adding detection context");
225                format!("{}\n\nCurrent visual detections:\n{}", command.text, detection_text)
226            } else {
227                info!("đŸ‘ī¸ Vision query detected but no detections available");
228                format!(
229                    "{}\n\n(No visual detections available - object detector may not be running)",
230                    command.text
231                )
232            };
233            self.llm.user(&prompt_with_context);
234        } else {
235            self.llm.user(&command.text);
236        }
237
238        // Generate response using BehaviorNode tick
239        match self.llm.tick(ctx).await {
240            Ok(_) => {
241                if let Some(response_text) = self.llm.last_response() {
242                    // Clone response_text to avoid borrow conflict
243                    let response_text = response_text.to_string();
244                    info!("✅ LLM response: {}", response_text);
245
246                    // Try to parse structured action from response
247                    let action_taken = self.try_parse_and_publish_action(ctx, &response_text).await;
248
249                    Ok(ResponseMessage {
250                        text: response_text,
251                        timestamp: std::time::SystemTime::now()
252                            .duration_since(std::time::UNIX_EPOCH)
253                            .unwrap()
254                            .as_secs(),
255                        action_taken,
256                        error: None,
257                    })
258                } else {
259                    Ok(ResponseMessage {
260                        text: "No response generated".to_string(),
261                        timestamp: std::time::SystemTime::now()
262                            .duration_since(std::time::UNIX_EPOCH)
263                            .unwrap()
264                            .as_secs(),
265                        action_taken: false,
266                        error: Some("Empty response from LLM".to_string()),
267                    })
268                }
269            }
270            Err(e) => {
271                warn!("❌ LLM error: {}", e);
272                Ok(ResponseMessage {
273                    text: format!("Error: {}", e),
274                    timestamp: std::time::SystemTime::now()
275                        .duration_since(std::time::UNIX_EPOCH)
276                        .unwrap()
277                        .as_secs(),
278                    action_taken: false,
279                    error: Some(e.to_string()),
280                })
281            }
282        }
283    }
284
285    /// Try to parse and publish structured actions from LLM response
286    async fn try_parse_and_publish_action(&mut self, ctx: &Context, response: &str) -> bool {
287        // Try to extract JSON from response (may be wrapped in markdown)
288        let json_str = if let Some(start) = response.find('{') {
289            if let Some(end) = response.rfind('}') {
290                &response[start..=end]
291            } else {
292                return false;
293            }
294        } else {
295            return false;
296        };
297
298        // Parse action
299        match serde_json::from_str::<ParsedAction>(json_str) {
300            Ok(action) => {
301                info!("📤 Parsed action: {:?}", action);
302                self.publish_action(ctx, action).await.unwrap_or_else(|e| {
303                    warn!("Failed to publish action: {}", e);
304                });
305                true
306            }
307            Err(e) => {
308                info!("â„šī¸  No structured action found ({})", e);
309                false
310            }
311        }
312    }
313
314    /// Publish parsed action to appropriate topic
315    async fn publish_action(&mut self, ctx: &Context, action: ParsedAction) -> Result<()> {
316        let timestamp = std::time::SystemTime::now()
317            .duration_since(std::time::UNIX_EPOCH)?
318            .as_secs();
319
320        // Interrupt behavior tree for motor/navigation commands
321        match &action {
322            ParsedAction::Navigate { .. } | ParsedAction::Motor { .. } => {
323                self.behavior_trigger.interrupt(ctx).await?;
324            }
325            _ => {}
326        }
327
328        match action {
329            ParsedAction::Navigate { goal } => {
330                let nav_goal = NavigationGoal {
331                    x: goal.x,
332                    y: goal.y,
333                    theta: goal.theta,
334                    timestamp,
335                };
336
337                let topic_path = Box::leak(self.config.topics.nav_goal_out.clone().into_boxed_str());
338                let topic = Topic::<NavigationGoal>::new(topic_path);
339                ctx.publish_to(topic, &nav_goal).await?;
340                info!("📍 Published navigation goal: ({}, {})", goal.x, goal.y);
341            }
342            ParsedAction::Motor {
343                linear,
344                angular,
345                duration_secs,
346            } => {
347                let motor_cmd = MotorCommand {
348                    linear,
349                    angular,
350                    duration_secs,
351                    timestamp,
352                };
353
354                let topic_path = Box::leak(self.config.topics.motor_cmd_out.clone().into_boxed_str());
355                let topic = Topic::<MotorCommand>::new(topic_path);
356                ctx.publish_to(topic, &motor_cmd).await?;
357
358                if let Some(duration) = duration_secs {
359                    info!(
360                        "🎮 Published motor command: linear={}, angular={}, duration={}s",
361                        linear, angular, duration
362                    );
363
364                    // Schedule stop command after duration
365                    let ctx_clone = ctx.clone();
366                    // Clone the topic path string for the async task
367                    let topic_str = self.config.topics.motor_cmd_out.clone();
368                    tokio::spawn(async move {
369                        tokio::time::sleep(tokio::time::Duration::from_secs_f64(duration)).await;
370
371                        let stop_cmd = MotorCommand {
372                            linear: 0.0,
373                            angular: 0.0,
374                            duration_secs: None,
375                            timestamp: std::time::SystemTime::now()
376                                .duration_since(std::time::UNIX_EPOCH)
377                                .unwrap()
378                                .as_secs(),
379                        };
380
381                        let stop_topic_path = Box::leak(topic_str.into_boxed_str());
382                        let stop_topic = Topic::<MotorCommand>::new(stop_topic_path);
383                        if let Err(e) = ctx_clone.publish_to(stop_topic, &stop_cmd).await {
384                            warn!("Failed to send stop command after duration: {}", e);
385                        } else {
386                            info!("🛑 Sent stop command after {}s duration", duration);
387                        }
388                    });
389                } else {
390                    info!("🎮 Published motor command: linear={}, angular={}", linear, angular);
391                }
392            }
393            ParsedAction::Behavior { name, params } => {
394                let behavior_cmd = BehaviorCommand {
395                    name: name.clone(),
396                    params,
397                    timestamp,
398                };
399
400                let topic_path = Box::leak(self.config.topics.behavior_out.clone().into_boxed_str());
401                let topic = Topic::<BehaviorCommand>::new(topic_path);
402                ctx.publish_to(topic, &behavior_cmd).await?;
403                info!("🤖 Published behavior command: {}", name);
404            }
405        }
406
407        Ok(())
408    }
409
410    /// Check if a command is asking about vision
411    fn is_vision_query(&self, text: &str) -> bool {
412        let text_lower = text.to_lowercase();
413        text_lower.contains("see")
414            || text_lower.contains("look")
415            || text_lower.contains("visible")
416            || text_lower.contains("detect")
417            || text_lower.contains("object")
418            || text_lower.contains("person")
419            || text_lower.contains("describe")
420            || text_lower.contains("what")
421            || text_lower.contains("who")
422            || text_lower.contains("where")
423            || text_lower.contains("count")
424            || text_lower.contains("how many")
425    }
426
427    /// Format detection results as human-readable text
428    fn format_detections(&self, detections: &DetectionResult) -> String {
429        if detections.detections.is_empty() {
430            return "No objects detected in the current frame.".to_string();
431        }
432
433        let mut lines = vec![];
434        lines.push(format!(
435            "Detected {} object(s) in frame #{}:",
436            detections.detections.len(),
437            detections.frame_id
438        ));
439
440        for (idx, det) in detections.detections.iter().enumerate() {
441            lines.push(format!(
442                "  {}. {} ({:.1}% confidence)",
443                idx + 1,
444                det.class_name,
445                det.confidence * 100.0
446            ));
447        }
448
449        lines.join("\n")
450    }
451}
452
453// === Node Entry Point ===
454
455pub async fn run() -> Result<()> {
456    info!("🤖 Starting LLM Command Node");
457
458    // Create context
459    let ctx = Context::new("llm-command").await?;
460
461    // Start automatic health reporting (reports healthy every 5 seconds)
462    ctx.start_health_reporting(|| async { HealthStatus::healthy() }).await?;
463
464    // Load config
465    let config: OpenAIReasoningConfig = ctx.load_node_config("llm-command").await?;
466    info!("Configuration: {:?}", config);
467
468    // Validate API key is set
469    match config.provider.as_str() {
470        "openai" => {
471            if env::var("OPENAI_API_KEY").is_err() {
472                anyhow::bail!(
473                    "OPENAI_API_KEY environment variable not set. Please set it in your .env file or environment."
474                );
475            }
476        }
477        "claude" => {
478            if env::var("ANTHROPIC_API_KEY").is_err() {
479                anyhow::bail!(
480                    "ANTHROPIC_API_KEY environment variable not set. Please set it in your .env file or environment."
481                );
482            }
483        }
484        "local" => {
485            info!("Using local LLM provider (Ollama)");
486        }
487        _ => {
488            anyhow::bail!("Unknown provider: {}", config.provider);
489        }
490    }
491
492    // Create node
493    let mut node = OpenAIReasoningNode::new(config.clone())?;
494
495    info!(
496        "✅ LLM Node initialized with {} provider ({})",
497        config.provider, config.llm_model
498    );
499
500    // Subscribe to command topic
501    let command_topic_path = Box::leak(config.topics.command_in.clone().into_boxed_str());
502    let response_topic_path = Box::leak(config.topics.response_out.clone().into_boxed_str());
503
504    let command_topic = Topic::<CommandMessage>::new(command_topic_path);
505    let response_topic = Topic::<ResponseMessage>::new(response_topic_path);
506
507    let mut command_receiver = ctx.subscribe(command_topic).await?;
508
509    info!("📡 Subscribed to: {}", command_topic_path);
510    info!("📤 Publishing responses to: {}", response_topic_path);
511
512    // Subscribe to vision detections (if vision is enabled)
513    let mut detection_receiver = if config.vision_enabled {
514        let detection_topic_path: &'static str = Box::leak(config.topics.detections_in.clone().into_boxed_str());
515        let detection_topic = Topic::<DetectionResult>::new(detection_topic_path);
516        let receiver = ctx.subscribe(detection_topic).await?;
517        info!("đŸ‘ī¸ Subscribed to vision detections: {}", detection_topic_path);
518        Some(receiver)
519    } else {
520        info!("đŸ‘ī¸ Vision queries disabled");
521        None
522    };
523
524    info!("🔄 Entering main processing loop");
525
526    // Publish ready message so dashboard knows node is available
527    let ready_message = ResponseMessage {
528        text: "LLM command node ready".to_string(),
529        timestamp: std::time::SystemTime::now()
530            .duration_since(std::time::UNIX_EPOCH)
531            .unwrap()
532            .as_secs(),
533        action_taken: false,
534        error: None,
535    };
536    ctx.publish_to(response_topic, &ready_message).await?;
537    info!("📤 Published ready message to {}", response_topic_path);
538
539    // Main processing loop
540    loop {
541        tokio::select! {
542            Some(command_msg) = command_receiver.recv() => {
543                info!("📨 Received command: {}", command_msg.text);
544
545                // Process command
546                match node.process_command(&ctx, &command_msg).await {
547                    Ok(response) => {
548                        // Publish response
549                        ctx.publish_to(response_topic, &response).await?;
550                        info!("📤 Published response");
551                    }
552                    Err(e) => {
553                        warn!("❌ Error processing command: {}", e);
554
555                        // Publish error response
556                        let error_response = ResponseMessage {
557                            text: format!("Error: {}", e),
558                            timestamp: std::time::SystemTime::now()
559                                .duration_since(std::time::UNIX_EPOCH)
560                                .unwrap()
561                                .as_secs(),
562                            action_taken: false,
563                            error: Some(e.to_string()),
564                        };
565                        ctx.publish_to(response_topic, &error_response).await?;
566                    }
567                }
568            }
569
570            Some(detection_msg) = async {
571                match &mut detection_receiver {
572                    Some(receiver) => receiver.recv().await,
573                    None => std::future::pending().await,
574                }
575            } => {
576                // Update latest detections
577                node.latest_detections = Some(detection_msg);
578            }
579        }
580    }
581}