mod config;
pub use config::{OpenAIReasoningConfig, TopicConfig};
use anyhow::{Context as AnyhowContext, Result};
use mecha10_ai_llm::prelude::*;
use mecha10_core::behavior_interrupt::BehaviorInterruptTrigger;
use mecha10_core::messages::Message;
use mecha10_core::prelude::*;
use mecha10_core::topics::Topic;
use serde::{Deserialize, Serialize};
use std::env;
use tracing::{info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandMessage {
pub text: String,
pub timestamp: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub user_id: Option<String>,
}
impl Message for CommandMessage {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResponseMessage {
pub text: String,
pub timestamp: u64,
pub action_taken: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
impl Message for ResponseMessage {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NavigationGoal {
pub x: f64,
pub y: f64,
#[serde(default)]
pub theta: f64,
pub timestamp: u64,
}
impl Message for NavigationGoal {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MotorCommand {
pub linear: f64,
pub angular: f64,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration_secs: Option<f64>,
pub timestamp: u64,
}
impl Message for MotorCommand {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BehaviorCommand {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
pub timestamp: u64,
}
impl Message for BehaviorCommand {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Detection {
pub class_id: u32,
pub class_name: String,
pub confidence: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DetectionResult {
pub frame_id: u64,
pub timestamp: u64,
pub detections: Vec<Detection>,
pub inference_time_ms: f32,
pub model_name: String,
}
impl Message for DetectionResult {}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "snake_case")]
enum ParsedAction {
Navigate {
goal: NavigationGoalData,
},
Motor {
linear: f64,
angular: f64,
#[serde(skip_serializing_if = "Option::is_none")]
duration_secs: Option<f64>,
},
Behavior {
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
params: Option<serde_json::Value>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NavigationGoalData {
pub x: f64,
pub y: f64,
#[serde(default)]
pub theta: f64,
}
pub struct OpenAIReasoningNode {
config: OpenAIReasoningConfig,
llm: LlmNode,
latest_detections: Option<DetectionResult>,
behavior_trigger: BehaviorInterruptTrigger,
}
impl OpenAIReasoningNode {
pub fn new(config: OpenAIReasoningConfig) -> Result<Self> {
let api_key = match config.provider.as_str() {
"openai" => env::var("OPENAI_API_KEY").context("OPENAI_API_KEY environment variable not set")?,
"claude" => env::var("ANTHROPIC_API_KEY").context("ANTHROPIC_API_KEY environment variable not set")?,
"local" => String::new(), _ => anyhow::bail!("Unknown provider: {}", config.provider),
};
let mut builder = match config.provider.as_str() {
"openai" => LlmNode::openai(),
"claude" => LlmNode::claude(),
"local" => LlmNode::local(),
_ => unreachable!(),
};
builder = builder
.with_model(&config.llm_model)
.with_temperature(config.temperature)
.with_max_tokens(config.max_tokens);
if !api_key.is_empty() {
builder = builder.with_api_key(api_key);
}
if let Some(ref system_prompt) = config.system_prompt {
builder = builder.with_system_prompt(system_prompt);
}
let llm = builder.build()?;
let behavior_trigger = BehaviorInterruptTrigger::new("llm-command", config.behavior_interrupt.clone());
Ok(Self {
config,
llm,
latest_detections: None,
behavior_trigger,
})
}
async fn process_command(&mut self, ctx: &Context, command: &CommandMessage) -> Result<ResponseMessage> {
info!("🧠 Processing command: {}", command.text);
self.llm.clear_messages();
let is_vision_query = self.is_vision_query(&command.text);
if is_vision_query && self.config.vision_enabled {
let prompt_with_context = if let Some(ref detections) = self.latest_detections {
let detection_text = self.format_detections(detections);
info!("👁️ Vision query detected - adding detection context");
format!("{}\n\nCurrent visual detections:\n{}", command.text, detection_text)
} else {
info!("👁️ Vision query detected but no detections available");
format!(
"{}\n\n(No visual detections available - object detector may not be running)",
command.text
)
};
self.llm.user(&prompt_with_context);
} else {
self.llm.user(&command.text);
}
match self.llm.tick(ctx).await {
Ok(_) => {
if let Some(response_text) = self.llm.last_response() {
let response_text = response_text.to_string();
info!("✅ LLM response: {}", response_text);
let action_taken = self.try_parse_and_publish_action(ctx, &response_text).await;
Ok(ResponseMessage {
text: response_text,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
action_taken,
error: None,
})
} else {
Ok(ResponseMessage {
text: "No response generated".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
action_taken: false,
error: Some("Empty response from LLM".to_string()),
})
}
}
Err(e) => {
warn!("❌ LLM error: {}", e);
Ok(ResponseMessage {
text: format!("Error: {}", e),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
action_taken: false,
error: Some(e.to_string()),
})
}
}
}
async fn try_parse_and_publish_action(&mut self, ctx: &Context, response: &str) -> bool {
let json_str = if let Some(start) = response.find('{') {
if let Some(end) = response.rfind('}') {
&response[start..=end]
} else {
return false;
}
} else {
return false;
};
match serde_json::from_str::<ParsedAction>(json_str) {
Ok(action) => {
info!("📤 Parsed action: {:?}", action);
self.publish_action(ctx, action).await.unwrap_or_else(|e| {
warn!("Failed to publish action: {}", e);
});
true
}
Err(e) => {
info!("ℹ️ No structured action found ({})", e);
false
}
}
}
async fn publish_action(&mut self, ctx: &Context, action: ParsedAction) -> Result<()> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
match &action {
ParsedAction::Navigate { .. } | ParsedAction::Motor { .. } => {
self.behavior_trigger.interrupt(ctx).await?;
}
_ => {}
}
match action {
ParsedAction::Navigate { goal } => {
let nav_goal = NavigationGoal {
x: goal.x,
y: goal.y,
theta: goal.theta,
timestamp,
};
let topic_path = Box::leak(self.config.topics.nav_goal_out.clone().into_boxed_str());
let topic = Topic::<NavigationGoal>::new(topic_path);
ctx.publish_to(topic, &nav_goal).await?;
info!("📍 Published navigation goal: ({}, {})", goal.x, goal.y);
}
ParsedAction::Motor {
linear,
angular,
duration_secs,
} => {
let motor_cmd = MotorCommand {
linear,
angular,
duration_secs,
timestamp,
};
let topic_path = Box::leak(self.config.topics.motor_cmd_out.clone().into_boxed_str());
let topic = Topic::<MotorCommand>::new(topic_path);
ctx.publish_to(topic, &motor_cmd).await?;
if let Some(duration) = duration_secs {
info!(
"🎮 Published motor command: linear={}, angular={}, duration={}s",
linear, angular, duration
);
let ctx_clone = ctx.clone();
let topic_str = self.config.topics.motor_cmd_out.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs_f64(duration)).await;
let stop_cmd = MotorCommand {
linear: 0.0,
angular: 0.0,
duration_secs: None,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
let stop_topic_path = Box::leak(topic_str.into_boxed_str());
let stop_topic = Topic::<MotorCommand>::new(stop_topic_path);
if let Err(e) = ctx_clone.publish_to(stop_topic, &stop_cmd).await {
warn!("Failed to send stop command after duration: {}", e);
} else {
info!("🛑 Sent stop command after {}s duration", duration);
}
});
} else {
info!("🎮 Published motor command: linear={}, angular={}", linear, angular);
}
}
ParsedAction::Behavior { name, params } => {
let behavior_cmd = BehaviorCommand {
name: name.clone(),
params,
timestamp,
};
let topic_path = Box::leak(self.config.topics.behavior_out.clone().into_boxed_str());
let topic = Topic::<BehaviorCommand>::new(topic_path);
ctx.publish_to(topic, &behavior_cmd).await?;
info!("🤖 Published behavior command: {}", name);
}
}
Ok(())
}
fn is_vision_query(&self, text: &str) -> bool {
let text_lower = text.to_lowercase();
text_lower.contains("see")
|| text_lower.contains("look")
|| text_lower.contains("visible")
|| text_lower.contains("detect")
|| text_lower.contains("object")
|| text_lower.contains("person")
|| text_lower.contains("describe")
|| text_lower.contains("what")
|| text_lower.contains("who")
|| text_lower.contains("where")
|| text_lower.contains("count")
|| text_lower.contains("how many")
}
fn format_detections(&self, detections: &DetectionResult) -> String {
if detections.detections.is_empty() {
return "No objects detected in the current frame.".to_string();
}
let mut lines = vec![];
lines.push(format!(
"Detected {} object(s) in frame #{}:",
detections.detections.len(),
detections.frame_id
));
for (idx, det) in detections.detections.iter().enumerate() {
lines.push(format!(
" {}. {} ({:.1}% confidence)",
idx + 1,
det.class_name,
det.confidence * 100.0
));
}
lines.join("\n")
}
}
pub async fn run() -> Result<()> {
info!("🤖 Starting LLM Command Node");
let ctx = Context::new("llm-command").await?;
let config: OpenAIReasoningConfig = ctx.load_node_config("llm-command").await?;
info!("Configuration: {:?}", config);
match config.provider.as_str() {
"openai" => {
if env::var("OPENAI_API_KEY").is_err() {
anyhow::bail!(
"OPENAI_API_KEY environment variable not set. Please set it in your .env file or environment."
);
}
}
"claude" => {
if env::var("ANTHROPIC_API_KEY").is_err() {
anyhow::bail!(
"ANTHROPIC_API_KEY environment variable not set. Please set it in your .env file or environment."
);
}
}
"local" => {
info!("Using local LLM provider (Ollama)");
}
_ => {
anyhow::bail!("Unknown provider: {}", config.provider);
}
}
let mut node = OpenAIReasoningNode::new(config.clone())?;
info!(
"✅ LLM Node initialized with {} provider ({})",
config.provider, config.llm_model
);
let command_topic_path = Box::leak(config.topics.command_in.clone().into_boxed_str());
let response_topic_path = Box::leak(config.topics.response_out.clone().into_boxed_str());
let command_topic = Topic::<CommandMessage>::new(command_topic_path);
let response_topic = Topic::<ResponseMessage>::new(response_topic_path);
let mut command_receiver = ctx.subscribe(command_topic).await?;
info!("📡 Subscribed to: {}", command_topic_path);
info!("📤 Publishing responses to: {}", response_topic_path);
let mut detection_receiver = if config.vision_enabled {
let detection_topic_path: &'static str = Box::leak(config.topics.detections_in.clone().into_boxed_str());
let detection_topic = Topic::<DetectionResult>::new(detection_topic_path);
let receiver = ctx.subscribe(detection_topic).await?;
info!("👁️ Subscribed to vision detections: {}", detection_topic_path);
Some(receiver)
} else {
info!("👁️ Vision queries disabled");
None
};
info!("🔄 Entering main processing loop");
let ready_message = ResponseMessage {
text: "LLM command node ready".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
action_taken: false,
error: None,
};
ctx.publish_to(response_topic, &ready_message).await?;
info!("📤 Published ready message to {}", response_topic_path);
loop {
tokio::select! {
Some(command_msg) = command_receiver.recv() => {
info!("📨 Received command: {}", command_msg.text);
match node.process_command(&ctx, &command_msg).await {
Ok(response) => {
ctx.publish_to(response_topic, &response).await?;
info!("📤 Published response");
}
Err(e) => {
warn!("❌ Error processing command: {}", e);
let error_response = ResponseMessage {
text: format!("Error: {}", e),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
action_taken: false,
error: Some(e.to_string()),
};
ctx.publish_to(response_topic, &error_response).await?;
}
}
}
Some(detection_msg) = async {
match &mut detection_receiver {
Some(receiver) => receiver.recv().await,
None => std::future::pending().await,
}
} => {
node.latest_detections = Some(detection_msg);
}
}
}
}