1use std::collections::HashMap;
2use std::fs;
3use std::io::Write;
4use std::path::PathBuf;
5use std::sync::{Arc, OnceLock};
6
7use serde::{Deserialize, Serialize};
8use tokio::sync::Mutex as TokioMutex;
9use tokio::sync::Notify;
10
11use crate::agent_config::{load_agent_config, AgentConfig, TradingMode};
12use crate::agent_history::AgentHistoryStore;
13use crate::claude::AgentDecision;
14use crate::tools::{
15 CancelOrderTool, DoNothingTool, GetOpenOrdersTool, GetPositionsTool, GetTradeHistoryTool,
16 PlaceOrderTool, SetStopLossTool, SetTakeProfitTool,
17};
18use hyper_agent_core::account_state_ext::{account_state_from_exchange, account_state_from_paper};
19use hyper_agent_core::config::AppConfig;
20use hyper_agent_core::executor::{DryRunExecutor, PaperExecutor};
21use hyper_agent_core::live_executor::{AssetMetaCache, LiveExecutor};
22use hyper_agent_core::pipeline::{OrderPipeline, PipelineContext, PipelineError};
23use hyper_agent_core::pipeline_stages::{LoggerStage, OrderRouterStage, RiskGuardStage};
24use hyper_agent_core::position_manager::PositionManager;
25use hyper_agent_core::signal::{Side, SignalAction, SignalSource, TradeSignal};
26
27use hyper_exchange::{ExchangeClient, Signer};
28use hyper_risk::risk::{AccountState, RiskGuard};
29use hyper_strategy::strategy_composer::ComposerProfile;
30#[allow(deprecated)]
31use hyper_ta::technical_analysis::{calculate_indicators, format_technical_summary};
32use motosan_chat_agent::{AgentLoop as MotosanAgentLoop, Message as AgentMessage};
33use motosan_chat_ai::MotosanAiClient;
34use motosan_chat_core::{Channel, ChatType, IncomingEvent, IncomingEventKind, Thread};
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub enum LoopState {
44 Running,
45 Paused,
46 Stopped,
47 Error,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub struct LoopStatus {
54 pub agent_id: String,
55 pub state: LoopState,
56 pub interval_minutes: u64,
57 pub iteration_count: u64,
58 pub last_iteration_at: Option<String>,
59 pub last_error: Option<String>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "camelCase")]
65pub struct LoopConfig {
66 pub agent_id: String,
67 pub interval_minutes: u64,
68 pub auto_start: bool,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct AgentLoopLog {
75 pub agent_id: String,
76 pub iteration: u64,
77 pub timestamp: String,
78 pub phase: String,
79 pub success: bool,
80 pub message: String,
81 pub decision_summary: Option<String>,
82 pub tool_turn_count: u32,
83 pub risk_blocked: bool,
84 pub order_executed: bool,
85}
86
87#[derive(Clone)]
97pub struct WalletContext {
98 pub agent_address: String,
100 pub is_mainnet: bool,
102 pub vault_address: Option<String>,
104 pub signer: Arc<dyn Signer>,
106}
107
108struct LoopHandle {
113 status: Arc<TokioMutex<LoopStatus>>,
114 cancel: tokio::sync::watch::Sender<bool>,
116 trigger: Arc<Notify>,
118}
119
120pub struct AgentLoopManager {
126 loops: TokioMutex<HashMap<String, LoopHandle>>,
127}
128
129impl AgentLoopManager {
130 pub fn new() -> Self {
131 Self {
132 loops: TokioMutex::new(HashMap::new()),
133 }
134 }
135
136 pub async fn start_loop(
140 &self,
141 agent_id: String,
142 interval_minutes: u64,
143 wallet_context: Option<WalletContext>,
144 ) -> Result<(), String> {
145 let mut loops = self.loops.lock().await;
146
147 if let Some(handle) = loops.get(&agent_id) {
149 let status = handle.status.lock().await;
150 if status.state == LoopState::Running || status.state == LoopState::Paused {
151 return Err(format!(
152 "Agent loop for '{}' is already {} -- stop it first",
153 agent_id,
154 match status.state {
155 LoopState::Running => "running",
156 LoopState::Paused => "paused",
157 _ => "active",
158 }
159 ));
160 }
161 }
162
163 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
164 let trigger = Arc::new(Notify::new());
165
166 let status = Arc::new(TokioMutex::new(LoopStatus {
167 agent_id: agent_id.clone(),
168 state: LoopState::Running,
169 interval_minutes,
170 iteration_count: 0,
171 last_iteration_at: None,
172 last_error: None,
173 }));
174
175 save_loop_config(&LoopConfig {
177 agent_id: agent_id.clone(),
178 interval_minutes,
179 auto_start: true,
180 });
181
182 let status_clone = Arc::clone(&status);
184 let trigger_clone = Arc::clone(&trigger);
185 let aid = agent_id.clone();
186 let wc = wallet_context.clone();
187 tokio::spawn(async move {
188 run_loop(
189 aid,
190 interval_minutes,
191 status_clone,
192 cancel_rx,
193 trigger_clone,
194 wc,
195 )
196 .await;
197 });
198
199 loops.insert(
200 agent_id,
201 LoopHandle {
202 status,
203 cancel: cancel_tx,
204 trigger,
205 },
206 );
207
208 Ok(())
209 }
210
211 pub async fn stop_loop(&self, agent_id: &str) -> Result<(), String> {
213 let mut loops = self.loops.lock().await;
214 let handle = loops
215 .remove(agent_id)
216 .ok_or_else(|| format!("No loop found for agent '{}'", agent_id))?;
217
218 let _ = handle.cancel.send(true);
220 handle.trigger.notify_one();
222
223 {
225 let mut status = handle.status.lock().await;
226 status.state = LoopState::Stopped;
227 }
228
229 save_loop_config(&LoopConfig {
231 agent_id: agent_id.to_string(),
232 interval_minutes: handle.status.lock().await.interval_minutes,
233 auto_start: false,
234 });
235
236 Ok(())
237 }
238
239 pub async fn pause_loop(&self, agent_id: &str) -> Result<(), String> {
241 let loops = self.loops.lock().await;
242 let handle = loops
243 .get(agent_id)
244 .ok_or_else(|| format!("No loop found for agent '{}'", agent_id))?;
245
246 let mut status = handle.status.lock().await;
247 if status.state != LoopState::Running {
248 return Err(format!(
249 "Cannot pause: agent loop is {:?}, not Running",
250 status.state
251 ));
252 }
253 status.state = LoopState::Paused;
254 Ok(())
255 }
256
257 pub async fn resume_loop(&self, agent_id: &str) -> Result<(), String> {
259 let loops = self.loops.lock().await;
260 let handle = loops
261 .get(agent_id)
262 .ok_or_else(|| format!("No loop found for agent '{}'", agent_id))?;
263
264 let mut status = handle.status.lock().await;
265 if status.state != LoopState::Paused {
266 return Err(format!(
267 "Cannot resume: agent loop is {:?}, not Paused",
268 status.state
269 ));
270 }
271 status.state = LoopState::Running;
272 handle.trigger.notify_one();
274 Ok(())
275 }
276
277 pub async fn trigger_once(&self, agent_id: &str) -> Result<(), String> {
280 let loops = self.loops.lock().await;
281 if let Some(handle) = loops.get(agent_id) {
282 handle.trigger.notify_one();
284 Ok(())
285 } else {
286 drop(loops);
288 let log = execute_single_iteration(agent_id, 0, None).await;
289 write_log_entry(&log);
290 if log.success {
291 Ok(())
292 } else {
293 Err(log.message)
294 }
295 }
296 }
297
298 pub async fn get_status(&self, agent_id: &str) -> Result<LoopStatus, String> {
300 let loops = self.loops.lock().await;
301 if let Some(handle) = loops.get(agent_id) {
302 Ok(handle.status.lock().await.clone())
303 } else {
304 Ok(LoopStatus {
306 agent_id: agent_id.to_string(),
307 state: LoopState::Stopped,
308 interval_minutes: 15,
309 iteration_count: 0,
310 last_iteration_at: None,
311 last_error: None,
312 })
313 }
314 }
315}
316
317async fn run_loop(
322 agent_id: String,
323 interval_minutes: u64,
324 status: Arc<TokioMutex<LoopStatus>>,
325 mut cancel_rx: tokio::sync::watch::Receiver<bool>,
326 trigger: Arc<Notify>,
327 wallet_context: Option<WalletContext>,
328) {
329 let interval = tokio::time::Duration::from_secs(interval_minutes * 60);
330
331 loop {
332 if *cancel_rx.borrow() {
334 break;
335 }
336
337 {
339 let s = status.lock().await;
340 if s.state == LoopState::Paused {
341 drop(s);
342 tokio::select! {
344 _ = trigger.notified() => continue,
345 _ = cancel_rx.changed() => break,
346 }
347 }
348 }
349
350 let iteration = {
352 let s = status.lock().await;
353 s.iteration_count
354 };
355
356 let log = execute_single_iteration(&agent_id, iteration, wallet_context.clone()).await;
357 write_log_entry(&log);
358
359 {
361 let mut s = status.lock().await;
362 if *cancel_rx.borrow() {
363 break;
364 }
365 s.iteration_count += 1;
366 s.last_iteration_at = Some(log.timestamp.clone());
367 if log.success {
368 s.last_error = None;
369 } else {
370 s.last_error = Some(log.message.clone());
371 }
372 }
373
374 tokio::select! {
376 _ = tokio::time::sleep(interval) => {},
377 _ = trigger.notified() => {},
378 _ = cancel_rx.changed() => break,
379 }
380 }
381
382 let mut s = status.lock().await;
384 s.state = LoopState::Stopped;
385}
386
387fn decision_to_trade_signal(
396 decision: &AgentDecision,
397 symbol: &str,
398 agent_id: &str,
399) -> Option<TradeSignal> {
400 for tc in &decision.tool_calls {
401 match tc.tool_name.as_str() {
402 "place_order" => {
403 let side = tc
404 .tool_input
405 .get("is_buy")
406 .and_then(|v| v.as_bool())
407 .map(|b| if b { Side::Buy } else { Side::Sell })
408 .unwrap_or(Side::Buy);
409 let size = tc
410 .tool_input
411 .get("size")
412 .and_then(|v| {
413 v.as_str()
414 .and_then(|s| s.parse().ok())
415 .or_else(|| v.as_f64())
416 })
417 .unwrap_or(0.0);
418 let price = tc.tool_input.get("price").and_then(|v| {
419 v.as_str()
420 .and_then(|s| s.parse().ok())
421 .or_else(|| v.as_f64())
422 });
423
424 return Some(TradeSignal {
425 id: uuid::Uuid::new_v4().to_string(),
426 timestamp: std::time::SystemTime::now()
427 .duration_since(std::time::UNIX_EPOCH)
428 .unwrap_or_default()
429 .as_secs(),
430 source: SignalSource::Playbook {
431 strategy_id: agent_id.to_string(),
432 regime: "claude".to_string(),
433 },
434 symbol: symbol.to_string(),
435 action: SignalAction::Open { side, size, price },
436 reason: decision.reasoning.clone(),
437 });
438 }
439 "do_nothing" => return None,
440 _ => continue,
441 }
442 }
443 None
444}
445
446fn error_log(
448 agent_id: &str,
449 iteration: u64,
450 timestamp: String,
451 phase: &str,
452 message: &str,
453) -> AgentLoopLog {
454 AgentLoopLog {
455 agent_id: agent_id.to_string(),
456 iteration,
457 timestamp,
458 phase: phase.to_string(),
459 success: false,
460 message: message.to_string(),
461 decision_summary: None,
462 tool_turn_count: 0,
463 risk_blocked: false,
464 order_executed: false,
465 }
466}
467
468async fn execute_single_iteration(
471 agent_id: &str,
472 iteration: u64,
473 wallet_context: Option<WalletContext>,
474) -> AgentLoopLog {
475 let timestamp = chrono::Utc::now().to_rfc3339();
476
477 let config = load_agent_config(agent_id).unwrap_or_else(AgentConfig::default);
479
480 let is_mainnet = true;
481 let fetcher = hyper_market::market_data::MarketDataFetcher::new(is_mainnet);
482
483 let trading_pairs = if config.trading_pairs.is_empty() {
485 vec!["BTC-PERP".to_string()]
486 } else {
487 config.trading_pairs.clone()
488 };
489 let primary_symbol = trading_pairs[0].clone();
490
491 let candles = match fetcher
492 .fetch_candles_with_params(&primary_symbol, "1h", 200)
493 .await
494 {
495 Ok(c) if !c.is_empty() => c,
496 Ok(_) => {
497 return error_log(
498 agent_id,
499 iteration,
500 timestamp,
501 "fetch_market_data",
502 "No candles returned from exchange",
503 );
504 }
505 Err(e) => {
506 return error_log(
507 agent_id,
508 iteration,
509 timestamp,
510 "fetch_market_data",
511 &format!("Failed to fetch market data: {}", e),
512 );
513 }
514 };
515
516 #[allow(deprecated)]
518 let indicators = calculate_indicators(&candles);
519 let current_price = candles.last().map(|c| c.close);
520 let ta_summary = format_technical_summary(&primary_symbol, &indicators, current_price);
521
522 let composer_analysis = if let Some(ref profile_str) = config.composer_profile {
524 if let Some(profile) = parse_composer_profile(profile_str) {
525 let composer =
526 hyper_strategy::strategy_composer::StrategyComposer::new(profile, &primary_symbol);
527 let volume_context =
528 hyper_risk::risk_defaults::analyze_volume_context(&candles, &indicators);
529 let signal = composer.compose_signals(&indicators, None, &volume_context);
530 Some(composer.format_for_claude(&signal))
531 } else {
532 None
533 }
534 } else {
535 None
536 };
537
538 let funding_rate_info =
540 match crate::tools::fetch_funding_rate(&primary_symbol, is_mainnet).await {
541 Ok(info) => Some(info),
542 Err(e) => {
543 eprintln!(
544 "[agent_loop] Failed to fetch funding rate for {}: {}",
545 primary_symbol, e
546 );
547 None
548 }
549 };
550
551 let mut user_message = String::with_capacity(4096);
553 user_message.push_str(&format!("## Market Analysis for {}\n\n", primary_symbol));
554 user_message.push_str(&ta_summary);
555
556 if let Some(ref composer_text) = composer_analysis {
557 user_message.push_str("\n\n## Strategy Composer Analysis\n");
558 user_message.push_str(composer_text);
559 }
560
561 if let Some(price) = current_price {
562 user_message.push_str(&format!("\n\nCurrent Price: ${:.2}", price));
563 }
564
565 if let Some(ref fr_info) = funding_rate_info {
566 user_message.push_str("\n\n## Funding Rate");
567 if let Some(rate_pct) = fr_info.get("funding_rate_pct").and_then(|v| v.as_str()) {
568 user_message.push_str(&format!("\nCurrent Funding Rate: {}", rate_pct));
569 }
570 if let Some(annual_pct) = fr_info.get("annualized_rate_pct").and_then(|v| v.as_str()) {
571 user_message.push_str(&format!("\nAnnualized Rate: {}", annual_pct));
572 }
573 if let Some(direction) = fr_info.get("direction").and_then(|v| v.as_str()) {
574 let direction_desc = match direction {
575 "longs_pay_shorts" => "Longs pay shorts (bullish sentiment)",
576 "shorts_pay_longs" => "Shorts pay longs (bearish sentiment)",
577 _ => "Neutral",
578 };
579 user_message.push_str(&format!("\nDirection: {}", direction_desc));
580 }
581 }
582
583 user_message.push_str("\n\n## Current Positions\nNo position data available in loop context.");
584
585 let mode_str = match config.trading_mode {
586 TradingMode::Paper => "PAPER TRADING (simulated)",
587 TradingMode::Live => "LIVE TRADING (real orders)",
588 };
589 user_message.push_str(&format!("\n\nTrading Mode: {}", mode_str));
590 user_message.push_str(&format!(
591 "\nMax Position Size: ${:.2}",
592 config.max_position_size_usd
593 ));
594 user_message.push_str(
595 "\n\nPlease analyze the market and decide: use place_order to trade, or do_nothing if conditions are not favorable.",
596 );
597
598 let system_prompt = config.system_prompt.clone();
599
600 let llm_result = match call_claude_real(
602 agent_id,
603 &system_prompt,
604 &user_message,
605 config.max_retries,
606 config.max_tool_turns,
607 )
608 .await
609 {
610 Ok(outcome) => outcome,
611 Err(e) => {
612 eprintln!("[agent_loop] Claude API call failed: {}", e);
613 return error_log(
614 agent_id,
615 iteration,
616 timestamp,
617 "call_claude",
618 &format!("Claude API error: {}", e),
619 );
620 }
621 };
622
623 let decision = &llm_result.decision;
624
625 let decision_summary = format!(
626 "reasoning={} tool_calls={} confidence={:.2}",
627 &decision.reasoning[..decision.reasoning.len().min(200)],
628 decision.tool_calls.len(),
629 decision.confidence,
630 );
631
632 let signal = decision_to_trade_signal(decision, &primary_symbol, agent_id);
634
635 match signal {
636 Some(sig) => {
637 let db_dir = {
639 let mut p = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
640 p.push("hyper-agent");
641 p.push("positions");
642 let _ = fs::create_dir_all(&p);
643 p
644 };
645 let db_path = db_dir.join(format!("{}.db", agent_id));
646 let pm = match PositionManager::new(db_path.to_str().unwrap_or("positions.db")) {
647 Ok(pm) => Arc::new(pm),
648 Err(e) => {
649 return error_log(
650 agent_id,
651 iteration,
652 timestamp,
653 "pipeline_init",
654 &format!("Failed to open position DB: {}", e),
655 );
656 }
657 };
658
659 let paper_exec = Arc::new(PaperExecutor::new(Arc::clone(&pm)));
660 let dry_run = Arc::new(DryRunExecutor);
661
662 let live_exec: Option<Arc<dyn hyper_agent_core::executor::OrderSubmitter>> =
664 if matches!(config.trading_mode, TradingMode::Live) {
665 if let Some(ref wc) = wallet_context {
666 let client = ExchangeClient::new(wc.is_mainnet);
667 match AssetMetaCache::fetch(&client).await {
668 Ok(meta_cache) => {
669 let executor = LiveExecutor::new(
670 Arc::clone(&wc.signer),
671 wc.agent_address.clone(),
672 wc.vault_address.clone(),
673 wc.is_mainnet,
674 client,
675 meta_cache,
676 );
677 Some(Arc::new(executor))
678 }
679 Err(e) => {
680 eprintln!(
681 "[agent_loop] Failed to fetch asset meta for live mode, \
682 falling back to paper: {}",
683 e
684 );
685 None
686 }
687 }
688 } else {
689 None
690 }
691 } else {
692 None
693 };
694
695 let pipeline = OrderPipeline::new(vec![
696 Box::new(RiskGuardStage::new()),
697 Box::new(OrderRouterStage::new(
698 paper_exec,
699 live_exec.clone(),
700 dry_run,
701 )),
702 Box::new(LoggerStage),
703 ]);
704
705 let (account_state, mode) = if live_exec.is_some() {
707 if let Some(ref wc) = wallet_context {
708 let client = ExchangeClient::new(wc.is_mainnet);
709 match account_state_from_exchange(&client, &wc.agent_address).await {
710 Ok(state) => (state, hyper_agent_core::pipeline::TradingMode::Live),
711 Err(e) => {
712 eprintln!(
713 "[agent_loop] Failed to fetch exchange account state, \
714 falling back to paper: {}",
715 e
716 );
717 (
718 account_state_from_paper(&pm).await,
719 hyper_agent_core::pipeline::TradingMode::Paper,
720 )
721 }
722 }
723 } else {
724 (
725 account_state_from_paper(&pm).await,
726 hyper_agent_core::pipeline::TradingMode::Paper,
727 )
728 }
729 } else {
730 (
731 account_state_from_paper(&pm).await,
732 hyper_agent_core::pipeline::TradingMode::Paper,
733 )
734 };
735
736 let mut ctx = PipelineContext {
737 mode,
738 account_state,
739 position_manager: pm,
740 config: Arc::new(AppConfig::default()),
741 execution_results: vec![],
742 };
743
744 match pipeline.execute(sig, &mut ctx).await {
745 Ok(_) => AgentLoopLog {
746 agent_id: agent_id.to_string(),
747 iteration,
748 timestamp,
749 phase: "complete".to_string(),
750 success: true,
751 message: "Order executed successfully via pipeline".to_string(),
752 decision_summary: Some(decision_summary),
753 tool_turn_count: llm_result.tool_turn_count,
754 risk_blocked: false,
755 order_executed: true,
756 },
757 Err(PipelineError::RiskBlocked(msg)) => AgentLoopLog {
758 agent_id: agent_id.to_string(),
759 iteration,
760 timestamp,
761 phase: "complete".to_string(),
762 success: true,
763 message: format!("Risk check blocked execution: {}", msg),
764 decision_summary: Some(decision_summary),
765 tool_turn_count: llm_result.tool_turn_count,
766 risk_blocked: true,
767 order_executed: false,
768 },
769 Err(PipelineError::CircuitBreakerTripped) => AgentLoopLog {
770 agent_id: agent_id.to_string(),
771 iteration,
772 timestamp,
773 phase: "complete".to_string(),
774 success: true,
775 message: "Circuit breaker tripped -- positions auto-closed".to_string(),
776 decision_summary: Some(decision_summary),
777 tool_turn_count: llm_result.tool_turn_count,
778 risk_blocked: true,
779 order_executed: false,
780 },
781 Err(e) => error_log(agent_id, iteration, timestamp, "pipeline", &e.to_string()),
782 }
783 }
784 None => {
785 let truncated_reasoning = &decision.reasoning[..decision.reasoning.len().min(200)];
787 AgentLoopLog {
788 agent_id: agent_id.to_string(),
789 iteration,
790 timestamp,
791 phase: "complete".to_string(),
792 success: true,
793 message: "No trade signal -- hold".to_string(),
794 decision_summary: Some(truncated_reasoning.to_string()),
795 tool_turn_count: llm_result.tool_turn_count,
796 risk_blocked: false,
797 order_executed: false,
798 }
799 }
800 }
801}
802
803pub fn parse_composer_profile(profile: &str) -> Option<ComposerProfile> {
809 match profile.to_lowercase().as_str() {
810 "conservative" => Some(ComposerProfile::Conservative),
811 "all_weather" | "allweather" => Some(ComposerProfile::AllWeather),
812 "turtle_system" | "turtlesystem" | "turtle" => Some(ComposerProfile::TurtleSystem),
813 _ => None,
814 }
815}
816
817fn extract_confidence_from_text(text: &str) -> Option<f64> {
818 let lower = text.to_lowercase();
819 for line in lower.lines() {
820 let line = line.trim();
821 if !line.contains("confidence") {
822 continue;
823 }
824 for part in line.split_whitespace() {
825 if let Ok(value) = part
826 .trim_matches(|c: char| !c.is_ascii_digit() && c != '.')
827 .parse::<f64>()
828 {
829 if (0.0..=1.0).contains(&value) {
830 return Some(value);
831 }
832 }
833 }
834 }
835 None
836}
837
838type ThinkingEventEmitter = Arc<dyn Fn(String, String) + Send + Sync>;
843static THINKING_EVENT_EMITTER: OnceLock<ThinkingEventEmitter> = OnceLock::new();
844
845pub fn set_thinking_event_emitter(emitter: ThinkingEventEmitter) {
846 let _ = THINKING_EVENT_EMITTER.set(emitter);
847}
848
849fn emit_thinking_event(agent_id: &str, chunk: &str) {
850 if let Some(emitter) = THINKING_EVENT_EMITTER.get() {
851 emitter(agent_id.to_string(), chunk.to_string());
852 }
853}
854
855static HISTORY_STORE: OnceLock<std::sync::Mutex<AgentHistoryStore>> = OnceLock::new();
860
861pub fn init_history_store(db_path: &str) {
866 let _ = HISTORY_STORE.get_or_init(|| {
867 let store = AgentHistoryStore::new(db_path).expect("Failed to open agent history database");
868 std::sync::Mutex::new(store)
869 });
870}
871
872fn persist_history(log: &AgentLoopLog) {
874 if let Some(store) = HISTORY_STORE.get() {
875 if let Ok(guard) = store.lock() {
876 if let Err(e) = guard.insert(log) {
877 tracing::warn!(error = %e, "Failed to persist agent history entry");
878 }
879 }
880 }
881}
882
883pub fn query_history(agent_id: &str, limit: usize) -> Vec<AgentLoopLog> {
885 HISTORY_STORE
886 .get()
887 .and_then(|store| store.lock().ok())
888 .and_then(|guard| guard.list(agent_id, limit).ok())
889 .unwrap_or_default()
890}
891
892struct LoopNoopChannel {
897 agent_id: String,
898}
899
900#[async_trait::async_trait]
901impl Channel for LoopNoopChannel {
902 fn name(&self) -> &str {
903 "agent-loop"
904 }
905
906 async fn listen(
907 &self,
908 _tx: tokio::sync::mpsc::Sender<IncomingEvent>,
909 ) -> motosan_chat_core::Result<()> {
910 Ok(())
911 }
912
913 async fn stream_chunk(&self, _user_id: &str, _chunk: &str) -> motosan_chat_core::Result<()> {
914 emit_thinking_event(&self.agent_id, _chunk);
915 Ok(())
916 }
917
918 async fn stream_done(&self, _user_id: &str) -> motosan_chat_core::Result<()> {
919 Ok(())
920 }
921
922 async fn send(&self, _user_id: &str, _text: &str) -> motosan_chat_core::Result<()> {
923 Ok(())
924 }
925}
926
927#[derive(Debug, Clone)]
932struct LlmCallResult {
933 decision: AgentDecision,
934 tool_turn_count: u32,
935}
936
937async fn call_claude_real(
939 agent_id: &str,
940 system_prompt: &str,
941 user_message: &str,
942 max_retries: u32,
943 max_tool_turns: u32,
944) -> Result<LlmCallResult, String> {
945 let cred_resolver = hyper_agent_core::credentials::CredentialResolver::new(
946 hyper_agent_core::config::CredentialsSection::default(),
947 );
948 let api_key = cred_resolver
949 .anthropic_key()
950 .ok_or_else(|| "No Anthropic API key found (set ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN env var, or configure in TOML)".to_string())?;
951
952 let retry_policy = motosan_ai::RetryPolicy::new()
953 .max_retries(max_retries)
954 .base_delay_ms(1_000)
955 .max_delay_ms(30_000)
956 .jitter(true)
957 .respect_retry_after(true);
958
959 let ai_client = motosan_ai::Client::builder()
960 .provider(motosan_ai::Provider::Anthropic)
961 .api_key(api_key)
962 .model(crate::claude::ClaudeModel::Sonnet4.as_str())
963 .retry_policy(retry_policy)
964 .build()
965 .map_err(|e| format!("Claude client build failed: {}", e))?;
966
967 let llm_client = MotosanAiClient::new(ai_client).with_system(system_prompt);
968
969 let risk_config = hyper_risk::risk::get_risk_config_sync();
970 let account_state = AccountState::default();
971
972 let tool_pm = {
974 let mut p = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
975 p.push("hyper-agent");
976 p.push("positions");
977 let _ = std::fs::create_dir_all(&p);
978 p.push(format!("{}.db", agent_id));
979 Arc::new(
980 PositionManager::new(p.to_str().unwrap_or("positions.db"))
981 .map_err(|e| format!("Failed to open position DB for tools: {}", e))?,
982 )
983 };
984
985 let loop_engine = MotosanAgentLoop::builder()
986 .tool(GetPositionsTool::new(Arc::clone(&tool_pm)))
987 .tool(PlaceOrderTool::new(
988 RiskGuard::new(risk_config.clone()),
989 account_state.clone(),
990 ))
991 .tool(CancelOrderTool::new(
992 RiskGuard::new(risk_config),
993 account_state,
994 ))
995 .tool(DoNothingTool)
996 .tool(SetStopLossTool)
997 .tool(SetTakeProfitTool)
998 .tool(GetOpenOrdersTool::new(true))
999 .tool(GetTradeHistoryTool::new(true))
1000 .max_iterations(max_tool_turns as usize)
1001 .build();
1002
1003 let event = IncomingEvent {
1004 id: uuid::Uuid::new_v4().to_string(),
1005 platform: "hyper-agent".to_string(),
1006 user_id: "agent-loop".to_string(),
1007 channel_id: "agent-loop".to_string(),
1008 text: user_message.to_string(),
1009 reply_token: None,
1010 attachments: vec![],
1011 chat_type: ChatType::DirectMessage,
1012 kind: IncomingEventKind::Message,
1013 };
1014 let thread = Thread::new(
1015 event,
1016 Arc::new(LoopNoopChannel {
1017 agent_id: agent_id.to_string(),
1018 }),
1019 );
1020
1021 let result = loop_engine
1022 .run(&llm_client, &thread, vec![AgentMessage::user(user_message)])
1023 .await
1024 .map_err(|e| format!("AgentLoop failed: {}", e))?;
1025
1026 let tool_calls = result
1027 .tool_calls
1028 .into_iter()
1029 .map(|(tool_name, tool_input)| crate::claude::ToolCall {
1030 tool_name,
1031 tool_input,
1032 })
1033 .collect();
1034
1035 Ok(LlmCallResult {
1036 decision: AgentDecision {
1037 reasoning: result.answer.clone(),
1038 tool_calls,
1039 confidence: extract_confidence_from_text(&result.answer).unwrap_or(0.5),
1040 raw_content: vec![],
1041 },
1042 tool_turn_count: result.iterations.saturating_sub(1) as u32,
1043 })
1044}
1045
1046fn loop_config_dir() -> PathBuf {
1051 let mut path = dirs::config_dir().unwrap_or_else(|| PathBuf::from("."));
1052 path.push("hyper-agent");
1053 path.push("agent-loops");
1054 let _ = fs::create_dir_all(&path);
1055 path
1056}
1057
1058fn loop_config_path(agent_id: &str) -> PathBuf {
1059 loop_config_dir().join(format!("{}.json", agent_id))
1060}
1061
1062fn save_loop_config(config: &LoopConfig) {
1063 let path = loop_config_path(&config.agent_id);
1064 if let Ok(json) = serde_json::to_string_pretty(config) {
1065 let _ = fs::write(&path, json);
1066 }
1067}
1068
1069pub fn load_all_loop_configs() -> Vec<LoopConfig> {
1071 let dir = loop_config_dir();
1072 let entries = match fs::read_dir(&dir) {
1073 Ok(e) => e,
1074 Err(_) => return Vec::new(),
1075 };
1076 let mut configs = Vec::new();
1077 for entry in entries.flatten() {
1078 let path = entry.path();
1079 if path.extension().is_some_and(|ext| ext == "json") {
1080 if let Ok(data) = fs::read_to_string(&path) {
1081 if let Ok(config) = serde_json::from_str::<LoopConfig>(&data) {
1082 configs.push(config);
1083 }
1084 }
1085 }
1086 }
1087 configs
1088}
1089
1090fn log_dir() -> PathBuf {
1092 let mut path = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
1093 path.push("hyper-agent");
1094 path.push("agent-logs");
1095 let _ = fs::create_dir_all(&path);
1096 path
1097}
1098
1099fn log_file_path(agent_id: &str) -> PathBuf {
1100 log_dir().join(format!("{}.jsonl", agent_id))
1101}
1102
1103pub fn write_log_entry(log: &AgentLoopLog) {
1105 let path = log_file_path(&log.agent_id);
1107 if let Ok(json) = serde_json::to_string(log) {
1108 if let Ok(mut file) = fs::OpenOptions::new().create(true).append(true).open(&path) {
1109 let _ = writeln!(file, "{}", json);
1110 }
1111 }
1112 persist_history(log);
1114}
1115
1116#[cfg(test)]
1121mod tests {
1122 use super::*;
1123
1124 #[test]
1125 fn test_loop_state_serialization() {
1126 let state = LoopState::Running;
1127 let json = serde_json::to_string(&state).unwrap();
1128 assert_eq!(json, "\"running\"");
1129
1130 let state = LoopState::Paused;
1131 let json = serde_json::to_string(&state).unwrap();
1132 assert_eq!(json, "\"paused\"");
1133
1134 let state = LoopState::Stopped;
1135 let json = serde_json::to_string(&state).unwrap();
1136 assert_eq!(json, "\"stopped\"");
1137
1138 let state = LoopState::Error;
1139 let json = serde_json::to_string(&state).unwrap();
1140 assert_eq!(json, "\"error\"");
1141 }
1142
1143 #[test]
1144 fn test_loop_state_deserialization() {
1145 let state: LoopState = serde_json::from_str("\"running\"").unwrap();
1146 assert_eq!(state, LoopState::Running);
1147
1148 let state: LoopState = serde_json::from_str("\"paused\"").unwrap();
1149 assert_eq!(state, LoopState::Paused);
1150
1151 let state: LoopState = serde_json::from_str("\"stopped\"").unwrap();
1152 assert_eq!(state, LoopState::Stopped);
1153 }
1154
1155 #[test]
1156 fn test_loop_status_serialization() {
1157 let status = LoopStatus {
1158 agent_id: "agent-1".to_string(),
1159 state: LoopState::Running,
1160 interval_minutes: 15,
1161 iteration_count: 5,
1162 last_iteration_at: Some("2026-03-09T10:00:00Z".to_string()),
1163 last_error: None,
1164 };
1165 let json = serde_json::to_value(&status).unwrap();
1166 assert_eq!(json["agentId"], "agent-1");
1167 assert_eq!(json["state"], "running");
1168 assert_eq!(json["intervalMinutes"], 15);
1169 assert_eq!(json["iterationCount"], 5);
1170 }
1171
1172 #[test]
1173 fn test_loop_config_serialization() {
1174 let config = LoopConfig {
1175 agent_id: "agent-1".to_string(),
1176 interval_minutes: 15,
1177 auto_start: true,
1178 };
1179 let json = serde_json::to_value(&config).unwrap();
1180 assert_eq!(json["agentId"], "agent-1");
1181 assert_eq!(json["intervalMinutes"], 15);
1182 assert_eq!(json["autoStart"], true);
1183 }
1184
1185 #[test]
1186 fn test_agent_loop_log_serialization() {
1187 let log = AgentLoopLog {
1188 agent_id: "agent-1".to_string(),
1189 iteration: 3,
1190 timestamp: "2026-03-09T10:30:00Z".to_string(),
1191 phase: "complete".to_string(),
1192 success: true,
1193 message: "All good".to_string(),
1194 decision_summary: Some("Buy BTC".to_string()),
1195 tool_turn_count: 2,
1196 risk_blocked: false,
1197 order_executed: true,
1198 };
1199 let json = serde_json::to_value(&log).unwrap();
1200 assert_eq!(json["agentId"], "agent-1");
1201 assert_eq!(json["iteration"], 3);
1202 assert_eq!(json["phase"], "complete");
1203 assert_eq!(json["success"], true);
1204 assert_eq!(json["toolTurnCount"], 2);
1205 assert_eq!(json["riskBlocked"], false);
1206 assert_eq!(json["orderExecuted"], true);
1207 }
1208
1209 #[test]
1210 fn test_agent_loop_manager_creation() {
1211 let _manager = AgentLoopManager::new();
1212 }
1213
1214 #[tokio::test]
1215 async fn test_start_and_stop_loop() {
1216 let manager = AgentLoopManager::new();
1217 let result = manager.start_loop("test-agent".to_string(), 1, None).await;
1218 assert!(result.is_ok());
1219
1220 let status = manager.get_status("test-agent").await.unwrap();
1221 assert_eq!(status.state, LoopState::Running);
1222 assert_eq!(status.interval_minutes, 1);
1223
1224 let result = manager.stop_loop("test-agent").await;
1225 assert!(result.is_ok());
1226 }
1227
1228 #[tokio::test]
1229 async fn test_start_duplicate_loop_fails() {
1230 let manager = AgentLoopManager::new();
1231 manager
1232 .start_loop("dup-agent".to_string(), 1, None)
1233 .await
1234 .unwrap();
1235
1236 let result = manager.start_loop("dup-agent".to_string(), 1, None).await;
1237 assert!(result.is_err());
1238 assert!(result.unwrap_err().contains("already"));
1239
1240 manager.stop_loop("dup-agent").await.unwrap();
1241 }
1242
1243 #[tokio::test]
1244 async fn test_stop_nonexistent_loop_fails() {
1245 let manager = AgentLoopManager::new();
1246 let result = manager.stop_loop("nonexistent").await;
1247 assert!(result.is_err());
1248 }
1249
1250 #[tokio::test]
1251 async fn test_pause_and_resume_loop() {
1252 let manager = AgentLoopManager::new();
1253 manager
1254 .start_loop("pr-agent".to_string(), 60, None)
1255 .await
1256 .unwrap();
1257
1258 let result = manager.pause_loop("pr-agent").await;
1259 assert!(result.is_ok());
1260
1261 let status = manager.get_status("pr-agent").await.unwrap();
1262 assert_eq!(status.state, LoopState::Paused);
1263
1264 let result = manager.resume_loop("pr-agent").await;
1265 assert!(result.is_ok());
1266
1267 let status = manager.get_status("pr-agent").await.unwrap();
1268 assert_eq!(status.state, LoopState::Running);
1269
1270 manager.stop_loop("pr-agent").await.unwrap();
1271 }
1272
1273 #[tokio::test]
1274 async fn test_get_status_nonexistent_returns_stopped() {
1275 let manager = AgentLoopManager::new();
1276 let status = manager.get_status("no-such-agent").await.unwrap();
1277 assert_eq!(status.state, LoopState::Stopped);
1278 }
1279
1280 #[test]
1281 fn test_parse_composer_profile() {
1282 assert!(matches!(
1283 parse_composer_profile("conservative"),
1284 Some(ComposerProfile::Conservative)
1285 ));
1286 assert!(matches!(
1287 parse_composer_profile("all_weather"),
1288 Some(ComposerProfile::AllWeather)
1289 ));
1290 assert!(matches!(
1291 parse_composer_profile("turtle_system"),
1292 Some(ComposerProfile::TurtleSystem)
1293 ));
1294 assert!(matches!(
1295 parse_composer_profile("turtle"),
1296 Some(ComposerProfile::TurtleSystem)
1297 ));
1298 assert!(parse_composer_profile("unknown").is_none());
1299 }
1300}