1mod config;
23
24pub use config::{OpenAIReasoningConfig, TopicConfig};
25
26use anyhow::{Context as AnyhowContext, Result};
27use mecha10_ai_llm::prelude::*;
28use mecha10_core::behavior_interrupt::BehaviorInterruptTrigger;
29use mecha10_core::health::HealthReportingExt;
30use mecha10_core::messages::{HealthStatus, Message};
31use mecha10_core::prelude::*;
32use mecha10_core::topics::Topic;
33use serde::{Deserialize, Serialize};
34use std::env;
35use tracing::{info, warn};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct CommandMessage {
42 pub text: String,
44 pub timestamp: u64,
46 #[serde(skip_serializing_if = "Option::is_none")]
48 pub user_id: Option<String>,
49}
50
51impl Message for CommandMessage {}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ResponseMessage {
56 pub text: String,
58 pub timestamp: u64,
60 pub action_taken: bool,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub error: Option<String>,
65}
66
67impl Message for ResponseMessage {}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct NavigationGoal {
72 pub x: f64,
73 pub y: f64,
74 #[serde(default)]
75 pub theta: f64,
76 pub timestamp: u64,
77}
78
79impl Message for NavigationGoal {}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MotorCommand {
84 pub linear: f64,
86 pub angular: f64,
88 #[serde(skip_serializing_if = "Option::is_none")]
90 pub duration_secs: Option<f64>,
91 pub timestamp: u64,
92}
93
94impl Message for MotorCommand {}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct BehaviorCommand {
99 pub name: String,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub params: Option<serde_json::Value>,
104 pub timestamp: u64,
105}
106
107impl Message for BehaviorCommand {}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct Detection {
112 pub class_id: u32,
113 pub class_name: String,
114 pub confidence: f32,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct DetectionResult {
120 pub frame_id: u64,
121 pub timestamp: u64,
122 pub detections: Vec<Detection>,
123 pub inference_time_ms: f32,
124 pub model_name: String,
125}
126
127impl Message for DetectionResult {}
128
129#[derive(Debug, Clone, Serialize, Deserialize)]
131#[serde(tag = "action", rename_all = "snake_case")]
132enum ParsedAction {
133 Navigate {
134 goal: NavigationGoalData,
135 },
136 Motor {
137 linear: f64,
138 angular: f64,
139 #[serde(skip_serializing_if = "Option::is_none")]
140 duration_secs: Option<f64>,
141 },
142 Behavior {
143 name: String,
144 #[serde(skip_serializing_if = "Option::is_none")]
145 params: Option<serde_json::Value>,
146 },
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize)]
150struct NavigationGoalData {
151 pub x: f64,
152 pub y: f64,
153 #[serde(default)]
154 pub theta: f64,
155}
156
157pub struct OpenAIReasoningNode {
160 config: OpenAIReasoningConfig,
161 llm: LlmNode,
162 latest_detections: Option<DetectionResult>,
163 behavior_trigger: BehaviorInterruptTrigger,
164}
165
166impl OpenAIReasoningNode {
167 pub fn new(config: OpenAIReasoningConfig) -> Result<Self> {
168 let api_key = match config.provider.as_str() {
170 "openai" => env::var("OPENAI_API_KEY").context("OPENAI_API_KEY environment variable not set")?,
171 "claude" => env::var("ANTHROPIC_API_KEY").context("ANTHROPIC_API_KEY environment variable not set")?,
172 "local" => String::new(), _ => anyhow::bail!("Unknown provider: {}", config.provider),
174 };
175
176 let mut builder = match config.provider.as_str() {
178 "openai" => LlmNode::openai(),
179 "claude" => LlmNode::claude(),
180 "local" => LlmNode::local(),
181 _ => unreachable!(),
182 };
183
184 builder = builder
185 .with_model(&config.llm_model)
186 .with_temperature(config.temperature)
187 .with_max_tokens(config.max_tokens);
188
189 if !api_key.is_empty() {
190 builder = builder.with_api_key(api_key);
191 }
192
193 if let Some(ref system_prompt) = config.system_prompt {
194 builder = builder.with_system_prompt(system_prompt);
195 }
196
197 let llm = builder.build()?;
198
199 let behavior_trigger = BehaviorInterruptTrigger::new("llm-command", config.behavior_interrupt.clone());
201
202 Ok(Self {
203 config,
204 llm,
205 latest_detections: None,
206 behavior_trigger,
207 })
208 }
209
210 async fn process_command(&mut self, ctx: &Context, command: &CommandMessage) -> Result<ResponseMessage> {
212 info!("đ§ Processing command: {}", command.text);
213
214 self.llm.clear_messages(); let is_vision_query = self.is_vision_query(&command.text);
219
220 if is_vision_query && self.config.vision_enabled {
221 let prompt_with_context = if let Some(ref detections) = self.latest_detections {
223 let detection_text = self.format_detections(detections);
224 info!("đī¸ Vision query detected - adding detection context");
225 format!("{}\n\nCurrent visual detections:\n{}", command.text, detection_text)
226 } else {
227 info!("đī¸ Vision query detected but no detections available");
228 format!(
229 "{}\n\n(No visual detections available - object detector may not be running)",
230 command.text
231 )
232 };
233 self.llm.user(&prompt_with_context);
234 } else {
235 self.llm.user(&command.text);
236 }
237
238 match self.llm.tick(ctx).await {
240 Ok(_) => {
241 if let Some(response_text) = self.llm.last_response() {
242 let response_text = response_text.to_string();
244 info!("â
LLM response: {}", response_text);
245
246 let action_taken = self.try_parse_and_publish_action(ctx, &response_text).await;
248
249 Ok(ResponseMessage {
250 text: response_text,
251 timestamp: std::time::SystemTime::now()
252 .duration_since(std::time::UNIX_EPOCH)
253 .unwrap()
254 .as_secs(),
255 action_taken,
256 error: None,
257 })
258 } else {
259 Ok(ResponseMessage {
260 text: "No response generated".to_string(),
261 timestamp: std::time::SystemTime::now()
262 .duration_since(std::time::UNIX_EPOCH)
263 .unwrap()
264 .as_secs(),
265 action_taken: false,
266 error: Some("Empty response from LLM".to_string()),
267 })
268 }
269 }
270 Err(e) => {
271 warn!("â LLM error: {}", e);
272 Ok(ResponseMessage {
273 text: format!("Error: {}", e),
274 timestamp: std::time::SystemTime::now()
275 .duration_since(std::time::UNIX_EPOCH)
276 .unwrap()
277 .as_secs(),
278 action_taken: false,
279 error: Some(e.to_string()),
280 })
281 }
282 }
283 }
284
285 async fn try_parse_and_publish_action(&mut self, ctx: &Context, response: &str) -> bool {
287 let json_str = if let Some(start) = response.find('{') {
289 if let Some(end) = response.rfind('}') {
290 &response[start..=end]
291 } else {
292 return false;
293 }
294 } else {
295 return false;
296 };
297
298 match serde_json::from_str::<ParsedAction>(json_str) {
300 Ok(action) => {
301 info!("đ¤ Parsed action: {:?}", action);
302 self.publish_action(ctx, action).await.unwrap_or_else(|e| {
303 warn!("Failed to publish action: {}", e);
304 });
305 true
306 }
307 Err(e) => {
308 info!("âšī¸ No structured action found ({})", e);
309 false
310 }
311 }
312 }
313
314 async fn publish_action(&mut self, ctx: &Context, action: ParsedAction) -> Result<()> {
316 let timestamp = std::time::SystemTime::now()
317 .duration_since(std::time::UNIX_EPOCH)?
318 .as_secs();
319
320 match &action {
322 ParsedAction::Navigate { .. } | ParsedAction::Motor { .. } => {
323 self.behavior_trigger.interrupt(ctx).await?;
324 }
325 _ => {}
326 }
327
328 match action {
329 ParsedAction::Navigate { goal } => {
330 let nav_goal = NavigationGoal {
331 x: goal.x,
332 y: goal.y,
333 theta: goal.theta,
334 timestamp,
335 };
336
337 let topic_path = Box::leak(self.config.topics.nav_goal_out.clone().into_boxed_str());
338 let topic = Topic::<NavigationGoal>::new(topic_path);
339 ctx.publish_to(topic, &nav_goal).await?;
340 info!("đ Published navigation goal: ({}, {})", goal.x, goal.y);
341 }
342 ParsedAction::Motor {
343 linear,
344 angular,
345 duration_secs,
346 } => {
347 let motor_cmd = MotorCommand {
348 linear,
349 angular,
350 duration_secs,
351 timestamp,
352 };
353
354 let topic_path = Box::leak(self.config.topics.motor_cmd_out.clone().into_boxed_str());
355 let topic = Topic::<MotorCommand>::new(topic_path);
356 ctx.publish_to(topic, &motor_cmd).await?;
357
358 if let Some(duration) = duration_secs {
359 info!(
360 "đŽ Published motor command: linear={}, angular={}, duration={}s",
361 linear, angular, duration
362 );
363
364 let ctx_clone = ctx.clone();
366 let topic_str = self.config.topics.motor_cmd_out.clone();
368 tokio::spawn(async move {
369 tokio::time::sleep(tokio::time::Duration::from_secs_f64(duration)).await;
370
371 let stop_cmd = MotorCommand {
372 linear: 0.0,
373 angular: 0.0,
374 duration_secs: None,
375 timestamp: std::time::SystemTime::now()
376 .duration_since(std::time::UNIX_EPOCH)
377 .unwrap()
378 .as_secs(),
379 };
380
381 let stop_topic_path = Box::leak(topic_str.into_boxed_str());
382 let stop_topic = Topic::<MotorCommand>::new(stop_topic_path);
383 if let Err(e) = ctx_clone.publish_to(stop_topic, &stop_cmd).await {
384 warn!("Failed to send stop command after duration: {}", e);
385 } else {
386 info!("đ Sent stop command after {}s duration", duration);
387 }
388 });
389 } else {
390 info!("đŽ Published motor command: linear={}, angular={}", linear, angular);
391 }
392 }
393 ParsedAction::Behavior { name, params } => {
394 let behavior_cmd = BehaviorCommand {
395 name: name.clone(),
396 params,
397 timestamp,
398 };
399
400 let topic_path = Box::leak(self.config.topics.behavior_out.clone().into_boxed_str());
401 let topic = Topic::<BehaviorCommand>::new(topic_path);
402 ctx.publish_to(topic, &behavior_cmd).await?;
403 info!("đ¤ Published behavior command: {}", name);
404 }
405 }
406
407 Ok(())
408 }
409
410 fn is_vision_query(&self, text: &str) -> bool {
412 let text_lower = text.to_lowercase();
413 text_lower.contains("see")
414 || text_lower.contains("look")
415 || text_lower.contains("visible")
416 || text_lower.contains("detect")
417 || text_lower.contains("object")
418 || text_lower.contains("person")
419 || text_lower.contains("describe")
420 || text_lower.contains("what")
421 || text_lower.contains("who")
422 || text_lower.contains("where")
423 || text_lower.contains("count")
424 || text_lower.contains("how many")
425 }
426
427 fn format_detections(&self, detections: &DetectionResult) -> String {
429 if detections.detections.is_empty() {
430 return "No objects detected in the current frame.".to_string();
431 }
432
433 let mut lines = vec![];
434 lines.push(format!(
435 "Detected {} object(s) in frame #{}:",
436 detections.detections.len(),
437 detections.frame_id
438 ));
439
440 for (idx, det) in detections.detections.iter().enumerate() {
441 lines.push(format!(
442 " {}. {} ({:.1}% confidence)",
443 idx + 1,
444 det.class_name,
445 det.confidence * 100.0
446 ));
447 }
448
449 lines.join("\n")
450 }
451}
452
453pub async fn run() -> Result<()> {
456 info!("đ¤ Starting LLM Command Node");
457
458 let ctx = Context::new("llm-command").await?;
460
461 ctx.start_health_reporting(|| async { HealthStatus::healthy() }).await?;
463
464 let config: OpenAIReasoningConfig = ctx.load_node_config("llm-command").await?;
466 info!("Configuration: {:?}", config);
467
468 match config.provider.as_str() {
470 "openai" => {
471 if env::var("OPENAI_API_KEY").is_err() {
472 anyhow::bail!(
473 "OPENAI_API_KEY environment variable not set. Please set it in your .env file or environment."
474 );
475 }
476 }
477 "claude" => {
478 if env::var("ANTHROPIC_API_KEY").is_err() {
479 anyhow::bail!(
480 "ANTHROPIC_API_KEY environment variable not set. Please set it in your .env file or environment."
481 );
482 }
483 }
484 "local" => {
485 info!("Using local LLM provider (Ollama)");
486 }
487 _ => {
488 anyhow::bail!("Unknown provider: {}", config.provider);
489 }
490 }
491
492 let mut node = OpenAIReasoningNode::new(config.clone())?;
494
495 info!(
496 "â
LLM Node initialized with {} provider ({})",
497 config.provider, config.llm_model
498 );
499
500 let command_topic_path = Box::leak(config.topics.command_in.clone().into_boxed_str());
502 let response_topic_path = Box::leak(config.topics.response_out.clone().into_boxed_str());
503
504 let command_topic = Topic::<CommandMessage>::new(command_topic_path);
505 let response_topic = Topic::<ResponseMessage>::new(response_topic_path);
506
507 let mut command_receiver = ctx.subscribe(command_topic).await?;
508
509 info!("đĄ Subscribed to: {}", command_topic_path);
510 info!("đ¤ Publishing responses to: {}", response_topic_path);
511
512 let mut detection_receiver = if config.vision_enabled {
514 let detection_topic_path: &'static str = Box::leak(config.topics.detections_in.clone().into_boxed_str());
515 let detection_topic = Topic::<DetectionResult>::new(detection_topic_path);
516 let receiver = ctx.subscribe(detection_topic).await?;
517 info!("đī¸ Subscribed to vision detections: {}", detection_topic_path);
518 Some(receiver)
519 } else {
520 info!("đī¸ Vision queries disabled");
521 None
522 };
523
524 info!("đ Entering main processing loop");
525
526 let ready_message = ResponseMessage {
528 text: "LLM command node ready".to_string(),
529 timestamp: std::time::SystemTime::now()
530 .duration_since(std::time::UNIX_EPOCH)
531 .unwrap()
532 .as_secs(),
533 action_taken: false,
534 error: None,
535 };
536 ctx.publish_to(response_topic, &ready_message).await?;
537 info!("đ¤ Published ready message to {}", response_topic_path);
538
539 loop {
541 tokio::select! {
542 Some(command_msg) = command_receiver.recv() => {
543 info!("đ¨ Received command: {}", command_msg.text);
544
545 match node.process_command(&ctx, &command_msg).await {
547 Ok(response) => {
548 ctx.publish_to(response_topic, &response).await?;
550 info!("đ¤ Published response");
551 }
552 Err(e) => {
553 warn!("â Error processing command: {}", e);
554
555 let error_response = ResponseMessage {
557 text: format!("Error: {}", e),
558 timestamp: std::time::SystemTime::now()
559 .duration_since(std::time::UNIX_EPOCH)
560 .unwrap()
561 .as_secs(),
562 action_taken: false,
563 error: Some(e.to_string()),
564 };
565 ctx.publish_to(response_topic, &error_response).await?;
566 }
567 }
568 }
569
570 Some(detection_msg) = async {
571 match &mut detection_receiver {
572 Some(receiver) => receiver.recv().await,
573 None => std::future::pending().await,
574 }
575 } => {
576 node.latest_detections = Some(detection_msg);
578 }
579 }
580 }
581}