Skip to main content

hyper_agent_ai/
agent_loop.rs

1use std::collections::HashMap;
2use std::fs;
3use std::io::Write;
4use std::path::PathBuf;
5use std::sync::{Arc, OnceLock};
6
7use serde::{Deserialize, Serialize};
8use tokio::sync::Mutex as TokioMutex;
9use tokio::sync::Notify;
10
11use crate::agent_config::{load_agent_config, AgentConfig, TradingMode};
12use crate::agent_history::AgentHistoryStore;
13use crate::claude::AgentDecision;
14use crate::tools::{
15    CancelOrderTool, DoNothingTool, GetOpenOrdersTool, GetPositionsTool, GetTradeHistoryTool,
16    PlaceOrderTool, SetStopLossTool, SetTakeProfitTool,
17};
18use hyper_agent_core::account_state_ext::{account_state_from_exchange, account_state_from_paper};
19use hyper_agent_core::config::AppConfig;
20use hyper_agent_core::executor::{DryRunExecutor, PaperExecutor};
21use hyper_agent_core::live_executor::{AssetMetaCache, LiveExecutor};
22use hyper_agent_core::pipeline::{OrderPipeline, PipelineContext, PipelineError};
23use hyper_agent_core::pipeline_stages::{LoggerStage, OrderRouterStage, RiskGuardStage};
24use hyper_agent_core::position_manager::PositionManager;
25use hyper_agent_core::signal::{Side, SignalAction, SignalSource, TradeSignal};
26
27use hyper_exchange::{ExchangeClient, Signer};
28use hyper_risk::risk::{AccountState, RiskGuard};
29use hyper_strategy::strategy_composer::ComposerProfile;
30#[allow(deprecated)]
31use hyper_ta::technical_analysis::{calculate_indicators, format_technical_summary};
32use motosan_chat_agent::{AgentLoop as MotosanAgentLoop, Message as AgentMessage};
33use motosan_chat_ai::MotosanAiClient;
34use motosan_chat_core::{Channel, ChatType, IncomingEvent, IncomingEventKind, Thread};
35
36// ---------------------------------------------------------------------------
37// Types
38// ---------------------------------------------------------------------------
39
40/// Status of a single agent loop.
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(rename_all = "camelCase")]
43pub enum LoopState {
44    Running,
45    Paused,
46    Stopped,
47    Error,
48}
49
50/// Externally visible status of an agent loop.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub struct LoopStatus {
54    pub agent_id: String,
55    pub state: LoopState,
56    pub interval_minutes: u64,
57    pub iteration_count: u64,
58    pub last_iteration_at: Option<String>,
59    pub last_error: Option<String>,
60}
61
62/// Persisted configuration for an agent loop (restored on app restart).
63#[derive(Debug, Clone, Serialize, Deserialize)]
64#[serde(rename_all = "camelCase")]
65pub struct LoopConfig {
66    pub agent_id: String,
67    pub interval_minutes: u64,
68    pub auto_start: bool,
69}
70
71/// Log entry for a single loop iteration.
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct AgentLoopLog {
75    pub agent_id: String,
76    pub iteration: u64,
77    pub timestamp: String,
78    pub phase: String,
79    pub success: bool,
80    pub message: String,
81    pub decision_summary: Option<String>,
82    pub tool_turn_count: u32,
83    pub risk_blocked: bool,
84    pub order_executed: bool,
85}
86
87// ---------------------------------------------------------------------------
88// Wallet context for live order execution
89// ---------------------------------------------------------------------------
90
91/// Signing context required for live order execution on Hyperliquid L1.
92///
93/// Contains the agent wallet address and a signer that can produce EIP-712
94/// signatures. The private key is never exposed — only the `Signer` trait
95/// is used, which accepts a 32-byte hash and returns a 65-byte signature.
96#[derive(Clone)]
97pub struct WalletContext {
98    /// The agent wallet address (0x-prefixed hex, e.g. from onboarding).
99    pub agent_address: String,
100    /// Whether to target mainnet (true) or testnet (false).
101    pub is_mainnet: bool,
102    /// Optional vault address for vault-based trading.
103    pub vault_address: Option<String>,
104    /// Signer that holds the private key behind an opaque trait.
105    pub signer: Arc<dyn Signer>,
106}
107
108// ---------------------------------------------------------------------------
109// Internal handle for a running loop
110// ---------------------------------------------------------------------------
111
112struct LoopHandle {
113    status: Arc<TokioMutex<LoopStatus>>,
114    /// Signal to cancel the loop task.
115    cancel: tokio::sync::watch::Sender<bool>,
116    /// Notify to wake up a paused loop or trigger an immediate run.
117    trigger: Arc<Notify>,
118}
119
120// ---------------------------------------------------------------------------
121// AgentLoopManager
122// ---------------------------------------------------------------------------
123
124/// Manages multiple agent loops (one per agent_id).
125pub struct AgentLoopManager {
126    loops: TokioMutex<HashMap<String, LoopHandle>>,
127}
128
129impl AgentLoopManager {
130    pub fn new() -> Self {
131        Self {
132            loops: TokioMutex::new(HashMap::new()),
133        }
134    }
135
136    /// Start an agent loop. If one is already running for this agent, returns an error.
137    ///
138    /// `wallet_context` is required for `TradingMode::Live`. Pass `None` for paper trading.
139    pub async fn start_loop(
140        &self,
141        agent_id: String,
142        interval_minutes: u64,
143        wallet_context: Option<WalletContext>,
144    ) -> Result<(), String> {
145        let mut loops = self.loops.lock().await;
146
147        // If a loop exists and is not stopped, reject.
148        if let Some(handle) = loops.get(&agent_id) {
149            let status = handle.status.lock().await;
150            if status.state == LoopState::Running || status.state == LoopState::Paused {
151                return Err(format!(
152                    "Agent loop for '{}' is already {} -- stop it first",
153                    agent_id,
154                    match status.state {
155                        LoopState::Running => "running",
156                        LoopState::Paused => "paused",
157                        _ => "active",
158                    }
159                ));
160            }
161        }
162
163        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
164        let trigger = Arc::new(Notify::new());
165
166        let status = Arc::new(TokioMutex::new(LoopStatus {
167            agent_id: agent_id.clone(),
168            state: LoopState::Running,
169            interval_minutes,
170            iteration_count: 0,
171            last_iteration_at: None,
172            last_error: None,
173        }));
174
175        // Persist config
176        save_loop_config(&LoopConfig {
177            agent_id: agent_id.clone(),
178            interval_minutes,
179            auto_start: true,
180        });
181
182        // Spawn the loop task
183        let status_clone = Arc::clone(&status);
184        let trigger_clone = Arc::clone(&trigger);
185        let aid = agent_id.clone();
186        let wc = wallet_context.clone();
187        tokio::spawn(async move {
188            run_loop(
189                aid,
190                interval_minutes,
191                status_clone,
192                cancel_rx,
193                trigger_clone,
194                wc,
195            )
196            .await;
197        });
198
199        loops.insert(
200            agent_id,
201            LoopHandle {
202                status,
203                cancel: cancel_tx,
204                trigger,
205            },
206        );
207
208        Ok(())
209    }
210
211    /// Stop an agent loop.
212    pub async fn stop_loop(&self, agent_id: &str) -> Result<(), String> {
213        let mut loops = self.loops.lock().await;
214        let handle = loops
215            .remove(agent_id)
216            .ok_or_else(|| format!("No loop found for agent '{}'", agent_id))?;
217
218        // Signal cancellation
219        let _ = handle.cancel.send(true);
220        // Wake the task so it can observe the cancellation immediately
221        handle.trigger.notify_one();
222
223        // Update status
224        {
225            let mut status = handle.status.lock().await;
226            status.state = LoopState::Stopped;
227        }
228
229        // Update persisted config
230        save_loop_config(&LoopConfig {
231            agent_id: agent_id.to_string(),
232            interval_minutes: handle.status.lock().await.interval_minutes,
233            auto_start: false,
234        });
235
236        Ok(())
237    }
238
239    /// Pause a running loop.
240    pub async fn pause_loop(&self, agent_id: &str) -> Result<(), String> {
241        let loops = self.loops.lock().await;
242        let handle = loops
243            .get(agent_id)
244            .ok_or_else(|| format!("No loop found for agent '{}'", agent_id))?;
245
246        let mut status = handle.status.lock().await;
247        if status.state != LoopState::Running {
248            return Err(format!(
249                "Cannot pause: agent loop is {:?}, not Running",
250                status.state
251            ));
252        }
253        status.state = LoopState::Paused;
254        Ok(())
255    }
256
257    /// Resume a paused loop.
258    pub async fn resume_loop(&self, agent_id: &str) -> Result<(), String> {
259        let loops = self.loops.lock().await;
260        let handle = loops
261            .get(agent_id)
262            .ok_or_else(|| format!("No loop found for agent '{}'", agent_id))?;
263
264        let mut status = handle.status.lock().await;
265        if status.state != LoopState::Paused {
266            return Err(format!(
267                "Cannot resume: agent loop is {:?}, not Paused",
268                status.state
269            ));
270        }
271        status.state = LoopState::Running;
272        // Wake the loop so it starts the next iteration immediately
273        handle.trigger.notify_one();
274        Ok(())
275    }
276
277    /// Trigger a single immediate iteration (regardless of running state).
278    /// If no loop exists, creates a temporary one that runs once and stops.
279    pub async fn trigger_once(&self, agent_id: &str) -> Result<(), String> {
280        let loops = self.loops.lock().await;
281        if let Some(handle) = loops.get(agent_id) {
282            // Trigger the existing loop to run one iteration now
283            handle.trigger.notify_one();
284            Ok(())
285        } else {
286            // No active loop -- run a single iteration inline (paper mode, no wallet)
287            drop(loops);
288            let log = execute_single_iteration(agent_id, 0, None).await;
289            write_log_entry(&log);
290            if log.success {
291                Ok(())
292            } else {
293                Err(log.message)
294            }
295        }
296    }
297
298    /// Get the status of a specific agent loop.
299    pub async fn get_status(&self, agent_id: &str) -> Result<LoopStatus, String> {
300        let loops = self.loops.lock().await;
301        if let Some(handle) = loops.get(agent_id) {
302            Ok(handle.status.lock().await.clone())
303        } else {
304            // Check persisted config for a stopped loop
305            Ok(LoopStatus {
306                agent_id: agent_id.to_string(),
307                state: LoopState::Stopped,
308                interval_minutes: 15,
309                iteration_count: 0,
310                last_iteration_at: None,
311                last_error: None,
312            })
313        }
314    }
315}
316
317// ---------------------------------------------------------------------------
318// Loop runner
319// ---------------------------------------------------------------------------
320
321async fn run_loop(
322    agent_id: String,
323    interval_minutes: u64,
324    status: Arc<TokioMutex<LoopStatus>>,
325    mut cancel_rx: tokio::sync::watch::Receiver<bool>,
326    trigger: Arc<Notify>,
327    wallet_context: Option<WalletContext>,
328) {
329    let interval = tokio::time::Duration::from_secs(interval_minutes * 60);
330
331    loop {
332        // Check cancellation
333        if *cancel_rx.borrow() {
334            break;
335        }
336
337        // Check if paused -- if so, wait for a trigger/cancel
338        {
339            let s = status.lock().await;
340            if s.state == LoopState::Paused {
341                drop(s);
342                // Wait until either triggered (resume) or cancelled
343                tokio::select! {
344                    _ = trigger.notified() => continue,
345                    _ = cancel_rx.changed() => break,
346                }
347            }
348        }
349
350        // Execute one iteration
351        let iteration = {
352            let s = status.lock().await;
353            s.iteration_count
354        };
355
356        let log = execute_single_iteration(&agent_id, iteration, wallet_context.clone()).await;
357        write_log_entry(&log);
358
359        // Update status
360        {
361            let mut s = status.lock().await;
362            if *cancel_rx.borrow() {
363                break;
364            }
365            s.iteration_count += 1;
366            s.last_iteration_at = Some(log.timestamp.clone());
367            if log.success {
368                s.last_error = None;
369            } else {
370                s.last_error = Some(log.message.clone());
371            }
372        }
373
374        // Wait for the next interval, or an early trigger, or cancellation
375        tokio::select! {
376            _ = tokio::time::sleep(interval) => {},
377            _ = trigger.notified() => {},
378            _ = cancel_rx.changed() => break,
379        }
380    }
381
382    // Mark as stopped when we exit
383    let mut s = status.lock().await;
384    s.state = LoopState::Stopped;
385}
386
387// ---------------------------------------------------------------------------
388// Single iteration execution (pipeline-based)
389// ---------------------------------------------------------------------------
390
391/// Convert an AgentDecision (from Claude) into a TradeSignal for the pipeline.
392///
393/// Scans tool_calls for place_order and converts to an Open signal.
394/// Returns None for do_nothing or if no recognised trading tool call is found.
395fn decision_to_trade_signal(
396    decision: &AgentDecision,
397    symbol: &str,
398    agent_id: &str,
399) -> Option<TradeSignal> {
400    for tc in &decision.tool_calls {
401        match tc.tool_name.as_str() {
402            "place_order" => {
403                let side = tc
404                    .tool_input
405                    .get("is_buy")
406                    .and_then(|v| v.as_bool())
407                    .map(|b| if b { Side::Buy } else { Side::Sell })
408                    .unwrap_or(Side::Buy);
409                let size = tc
410                    .tool_input
411                    .get("size")
412                    .and_then(|v| {
413                        v.as_str()
414                            .and_then(|s| s.parse().ok())
415                            .or_else(|| v.as_f64())
416                    })
417                    .unwrap_or(0.0);
418                let price = tc.tool_input.get("price").and_then(|v| {
419                    v.as_str()
420                        .and_then(|s| s.parse().ok())
421                        .or_else(|| v.as_f64())
422                });
423
424                return Some(TradeSignal {
425                    id: uuid::Uuid::new_v4().to_string(),
426                    timestamp: std::time::SystemTime::now()
427                        .duration_since(std::time::UNIX_EPOCH)
428                        .unwrap_or_default()
429                        .as_secs(),
430                    source: SignalSource::Playbook {
431                        strategy_id: agent_id.to_string(),
432                        regime: "claude".to_string(),
433                    },
434                    symbol: symbol.to_string(),
435                    action: SignalAction::Open { side, size, price },
436                    reason: decision.reasoning.clone(),
437                });
438            }
439            "do_nothing" => return None,
440            _ => continue,
441        }
442    }
443    None
444}
445
446/// Create an error AgentLoopLog with minimal boilerplate.
447fn error_log(
448    agent_id: &str,
449    iteration: u64,
450    timestamp: String,
451    phase: &str,
452    message: &str,
453) -> AgentLoopLog {
454    AgentLoopLog {
455        agent_id: agent_id.to_string(),
456        iteration,
457        timestamp,
458        phase: phase.to_string(),
459        success: false,
460        message: message.to_string(),
461        decision_summary: None,
462        tool_turn_count: 0,
463        risk_blocked: false,
464        order_executed: false,
465    }
466}
467
468/// Pipeline-based single iteration: fetches market data, calls Claude, then
469/// routes the resulting signal through the unified OrderPipeline.
470async fn execute_single_iteration(
471    agent_id: &str,
472    iteration: u64,
473    wallet_context: Option<WalletContext>,
474) -> AgentLoopLog {
475    let timestamp = chrono::Utc::now().to_rfc3339();
476
477    // --- Phase 0: Load agent config ---
478    let config = load_agent_config(agent_id).unwrap_or_else(AgentConfig::default);
479
480    let is_mainnet = true;
481    let fetcher = hyper_market::market_data::MarketDataFetcher::new(is_mainnet);
482
483    // --- Phase 1: Fetch real candle data ---
484    let trading_pairs = if config.trading_pairs.is_empty() {
485        vec!["BTC-PERP".to_string()]
486    } else {
487        config.trading_pairs.clone()
488    };
489    let primary_symbol = trading_pairs[0].clone();
490
491    let candles = match fetcher
492        .fetch_candles_with_params(&primary_symbol, "1h", 200)
493        .await
494    {
495        Ok(c) if !c.is_empty() => c,
496        Ok(_) => {
497            return error_log(
498                agent_id,
499                iteration,
500                timestamp,
501                "fetch_market_data",
502                "No candles returned from exchange",
503            );
504        }
505        Err(e) => {
506            return error_log(
507                agent_id,
508                iteration,
509                timestamp,
510                "fetch_market_data",
511                &format!("Failed to fetch market data: {}", e),
512            );
513        }
514    };
515
516    // --- Phase 2: Calculate technical indicators ---
517    #[allow(deprecated)]
518    let indicators = calculate_indicators(&candles);
519    let current_price = candles.last().map(|c| c.close);
520    let ta_summary = format_technical_summary(&primary_symbol, &indicators, current_price);
521
522    // --- Phase 3: Compose strategy signals ---
523    let composer_analysis = if let Some(ref profile_str) = config.composer_profile {
524        if let Some(profile) = parse_composer_profile(profile_str) {
525            let composer =
526                hyper_strategy::strategy_composer::StrategyComposer::new(profile, &primary_symbol);
527            let volume_context =
528                hyper_risk::risk_defaults::analyze_volume_context(&candles, &indicators);
529            let signal = composer.compose_signals(&indicators, None, &volume_context);
530            Some(composer.format_for_claude(&signal))
531        } else {
532            None
533        }
534    } else {
535        None
536    };
537
538    // --- Phase 3b: Fetch funding rate ---
539    let funding_rate_info =
540        match crate::tools::fetch_funding_rate(&primary_symbol, is_mainnet).await {
541            Ok(info) => Some(info),
542            Err(e) => {
543                eprintln!(
544                    "[agent_loop] Failed to fetch funding rate for {}: {}",
545                    primary_symbol, e
546                );
547                None
548            }
549        };
550
551    // --- Phase 4: Build prompt for Claude ---
552    let mut user_message = String::with_capacity(4096);
553    user_message.push_str(&format!("## Market Analysis for {}\n\n", primary_symbol));
554    user_message.push_str(&ta_summary);
555
556    if let Some(ref composer_text) = composer_analysis {
557        user_message.push_str("\n\n## Strategy Composer Analysis\n");
558        user_message.push_str(composer_text);
559    }
560
561    if let Some(price) = current_price {
562        user_message.push_str(&format!("\n\nCurrent Price: ${:.2}", price));
563    }
564
565    if let Some(ref fr_info) = funding_rate_info {
566        user_message.push_str("\n\n## Funding Rate");
567        if let Some(rate_pct) = fr_info.get("funding_rate_pct").and_then(|v| v.as_str()) {
568            user_message.push_str(&format!("\nCurrent Funding Rate: {}", rate_pct));
569        }
570        if let Some(annual_pct) = fr_info.get("annualized_rate_pct").and_then(|v| v.as_str()) {
571            user_message.push_str(&format!("\nAnnualized Rate: {}", annual_pct));
572        }
573        if let Some(direction) = fr_info.get("direction").and_then(|v| v.as_str()) {
574            let direction_desc = match direction {
575                "longs_pay_shorts" => "Longs pay shorts (bullish sentiment)",
576                "shorts_pay_longs" => "Shorts pay longs (bearish sentiment)",
577                _ => "Neutral",
578            };
579            user_message.push_str(&format!("\nDirection: {}", direction_desc));
580        }
581    }
582
583    user_message.push_str("\n\n## Current Positions\nNo position data available in loop context.");
584
585    let mode_str = match config.trading_mode {
586        TradingMode::Paper => "PAPER TRADING (simulated)",
587        TradingMode::Live => "LIVE TRADING (real orders)",
588    };
589    user_message.push_str(&format!("\n\nTrading Mode: {}", mode_str));
590    user_message.push_str(&format!(
591        "\nMax Position Size: ${:.2}",
592        config.max_position_size_usd
593    ));
594    user_message.push_str(
595        "\n\nPlease analyze the market and decide: use place_order to trade, or do_nothing if conditions are not favorable.",
596    );
597
598    let system_prompt = config.system_prompt.clone();
599
600    // --- Phase 5: Call Claude via motosan-chat-agent ---
601    let llm_result = match call_claude_real(
602        agent_id,
603        &system_prompt,
604        &user_message,
605        config.max_retries,
606        config.max_tool_turns,
607    )
608    .await
609    {
610        Ok(outcome) => outcome,
611        Err(e) => {
612            eprintln!("[agent_loop] Claude API call failed: {}", e);
613            return error_log(
614                agent_id,
615                iteration,
616                timestamp,
617                "call_claude",
618                &format!("Claude API error: {}", e),
619            );
620        }
621    };
622
623    let decision = &llm_result.decision;
624
625    let decision_summary = format!(
626        "reasoning={} tool_calls={} confidence={:.2}",
627        &decision.reasoning[..decision.reasoning.len().min(200)],
628        decision.tool_calls.len(),
629        decision.confidence,
630    );
631
632    // --- Phase 6: Convert decision to TradeSignal and run through pipeline ---
633    let signal = decision_to_trade_signal(decision, &primary_symbol, agent_id);
634
635    match signal {
636        Some(sig) => {
637            // Build position manager with a deterministic per-agent DB path
638            let db_dir = {
639                let mut p = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
640                p.push("hyper-agent");
641                p.push("positions");
642                let _ = fs::create_dir_all(&p);
643                p
644            };
645            let db_path = db_dir.join(format!("{}.db", agent_id));
646            let pm = match PositionManager::new(db_path.to_str().unwrap_or("positions.db")) {
647                Ok(pm) => Arc::new(pm),
648                Err(e) => {
649                    return error_log(
650                        agent_id,
651                        iteration,
652                        timestamp,
653                        "pipeline_init",
654                        &format!("Failed to open position DB: {}", e),
655                    );
656                }
657            };
658
659            let paper_exec = Arc::new(PaperExecutor::new(Arc::clone(&pm)));
660            let dry_run = Arc::new(DryRunExecutor);
661
662            // Build live executor from wallet_context when available
663            let live_exec: Option<Arc<dyn hyper_agent_core::executor::OrderSubmitter>> =
664                if matches!(config.trading_mode, TradingMode::Live) {
665                    if let Some(ref wc) = wallet_context {
666                        let client = ExchangeClient::new(wc.is_mainnet);
667                        match AssetMetaCache::fetch(&client).await {
668                            Ok(meta_cache) => {
669                                let executor = LiveExecutor::new(
670                                    Arc::clone(&wc.signer),
671                                    wc.agent_address.clone(),
672                                    wc.vault_address.clone(),
673                                    wc.is_mainnet,
674                                    client,
675                                    meta_cache,
676                                );
677                                Some(Arc::new(executor))
678                            }
679                            Err(e) => {
680                                eprintln!(
681                                    "[agent_loop] Failed to fetch asset meta for live mode, \
682                                     falling back to paper: {}",
683                                    e
684                                );
685                                None
686                            }
687                        }
688                    } else {
689                        None
690                    }
691                } else {
692                    None
693                };
694
695            let pipeline = OrderPipeline::new(vec![
696                Box::new(RiskGuardStage::new()),
697                Box::new(OrderRouterStage::new(
698                    paper_exec,
699                    live_exec.clone(),
700                    dry_run,
701                )),
702                Box::new(LoggerStage),
703            ]);
704
705            // Use exchange account state in live mode, paper otherwise
706            let (account_state, mode) = if live_exec.is_some() {
707                if let Some(ref wc) = wallet_context {
708                    let client = ExchangeClient::new(wc.is_mainnet);
709                    match account_state_from_exchange(&client, &wc.agent_address).await {
710                        Ok(state) => (state, hyper_agent_core::pipeline::TradingMode::Live),
711                        Err(e) => {
712                            eprintln!(
713                                "[agent_loop] Failed to fetch exchange account state, \
714                                     falling back to paper: {}",
715                                e
716                            );
717                            (
718                                account_state_from_paper(&pm).await,
719                                hyper_agent_core::pipeline::TradingMode::Paper,
720                            )
721                        }
722                    }
723                } else {
724                    (
725                        account_state_from_paper(&pm).await,
726                        hyper_agent_core::pipeline::TradingMode::Paper,
727                    )
728                }
729            } else {
730                (
731                    account_state_from_paper(&pm).await,
732                    hyper_agent_core::pipeline::TradingMode::Paper,
733                )
734            };
735
736            let mut ctx = PipelineContext {
737                mode,
738                account_state,
739                position_manager: pm,
740                config: Arc::new(AppConfig::default()),
741                execution_results: vec![],
742            };
743
744            match pipeline.execute(sig, &mut ctx).await {
745                Ok(_) => AgentLoopLog {
746                    agent_id: agent_id.to_string(),
747                    iteration,
748                    timestamp,
749                    phase: "complete".to_string(),
750                    success: true,
751                    message: "Order executed successfully via pipeline".to_string(),
752                    decision_summary: Some(decision_summary),
753                    tool_turn_count: llm_result.tool_turn_count,
754                    risk_blocked: false,
755                    order_executed: true,
756                },
757                Err(PipelineError::RiskBlocked(msg)) => AgentLoopLog {
758                    agent_id: agent_id.to_string(),
759                    iteration,
760                    timestamp,
761                    phase: "complete".to_string(),
762                    success: true,
763                    message: format!("Risk check blocked execution: {}", msg),
764                    decision_summary: Some(decision_summary),
765                    tool_turn_count: llm_result.tool_turn_count,
766                    risk_blocked: true,
767                    order_executed: false,
768                },
769                Err(PipelineError::CircuitBreakerTripped) => AgentLoopLog {
770                    agent_id: agent_id.to_string(),
771                    iteration,
772                    timestamp,
773                    phase: "complete".to_string(),
774                    success: true,
775                    message: "Circuit breaker tripped -- positions auto-closed".to_string(),
776                    decision_summary: Some(decision_summary),
777                    tool_turn_count: llm_result.tool_turn_count,
778                    risk_blocked: true,
779                    order_executed: false,
780                },
781                Err(e) => error_log(agent_id, iteration, timestamp, "pipeline", &e.to_string()),
782            }
783        }
784        None => {
785            // Claude decided to do nothing (hold)
786            let truncated_reasoning = &decision.reasoning[..decision.reasoning.len().min(200)];
787            AgentLoopLog {
788                agent_id: agent_id.to_string(),
789                iteration,
790                timestamp,
791                phase: "complete".to_string(),
792                success: true,
793                message: "No trade signal -- hold".to_string(),
794                decision_summary: Some(truncated_reasoning.to_string()),
795                tool_turn_count: llm_result.tool_turn_count,
796                risk_blocked: false,
797                order_executed: false,
798            }
799        }
800    }
801}
802
803// ---------------------------------------------------------------------------
804// Helpers
805// ---------------------------------------------------------------------------
806
807/// Parse a composer profile string into a `ComposerProfile` enum.
808pub fn parse_composer_profile(profile: &str) -> Option<ComposerProfile> {
809    match profile.to_lowercase().as_str() {
810        "conservative" => Some(ComposerProfile::Conservative),
811        "all_weather" | "allweather" => Some(ComposerProfile::AllWeather),
812        "turtle_system" | "turtlesystem" | "turtle" => Some(ComposerProfile::TurtleSystem),
813        _ => None,
814    }
815}
816
817fn extract_confidence_from_text(text: &str) -> Option<f64> {
818    let lower = text.to_lowercase();
819    for line in lower.lines() {
820        let line = line.trim();
821        if !line.contains("confidence") {
822            continue;
823        }
824        for part in line.split_whitespace() {
825            if let Ok(value) = part
826                .trim_matches(|c: char| !c.is_ascii_digit() && c != '.')
827                .parse::<f64>()
828            {
829                if (0.0..=1.0).contains(&value) {
830                    return Some(value);
831                }
832            }
833        }
834    }
835    None
836}
837
838// ---------------------------------------------------------------------------
839// Thinking event emitter (used by Tauri)
840// ---------------------------------------------------------------------------
841
842type ThinkingEventEmitter = Arc<dyn Fn(String, String) + Send + Sync>;
843static THINKING_EVENT_EMITTER: OnceLock<ThinkingEventEmitter> = OnceLock::new();
844
845pub fn set_thinking_event_emitter(emitter: ThinkingEventEmitter) {
846    let _ = THINKING_EVENT_EMITTER.set(emitter);
847}
848
849fn emit_thinking_event(agent_id: &str, chunk: &str) {
850    if let Some(emitter) = THINKING_EVENT_EMITTER.get() {
851        emitter(agent_id.to_string(), chunk.to_string());
852    }
853}
854
855// ---------------------------------------------------------------------------
856// Global agent history store (SQLite)
857// ---------------------------------------------------------------------------
858
859static HISTORY_STORE: OnceLock<std::sync::Mutex<AgentHistoryStore>> = OnceLock::new();
860
861/// Initialise the global agent-history SQLite store.
862///
863/// Safe to call multiple times -- only the first call takes effect.
864/// Typically called once at daemon startup.
865pub fn init_history_store(db_path: &str) {
866    let _ = HISTORY_STORE.get_or_init(|| {
867        let store = AgentHistoryStore::new(db_path).expect("Failed to open agent history database");
868        std::sync::Mutex::new(store)
869    });
870}
871
872/// Insert a log entry into the global history store (if initialised).
873fn persist_history(log: &AgentLoopLog) {
874    if let Some(store) = HISTORY_STORE.get() {
875        if let Ok(guard) = store.lock() {
876            if let Err(e) = guard.insert(log) {
877                tracing::warn!(error = %e, "Failed to persist agent history entry");
878            }
879        }
880    }
881}
882
883/// Query the global history store. Returns an empty vec if not initialised.
884pub fn query_history(agent_id: &str, limit: usize) -> Vec<AgentLoopLog> {
885    HISTORY_STORE
886        .get()
887        .and_then(|store| store.lock().ok())
888        .and_then(|guard| guard.list(agent_id, limit).ok())
889        .unwrap_or_default()
890}
891
892// ---------------------------------------------------------------------------
893// No-op channel for agent loop (used by call_claude_real)
894// ---------------------------------------------------------------------------
895
896struct LoopNoopChannel {
897    agent_id: String,
898}
899
900#[async_trait::async_trait]
901impl Channel for LoopNoopChannel {
902    fn name(&self) -> &str {
903        "agent-loop"
904    }
905
906    async fn listen(
907        &self,
908        _tx: tokio::sync::mpsc::Sender<IncomingEvent>,
909    ) -> motosan_chat_core::Result<()> {
910        Ok(())
911    }
912
913    async fn stream_chunk(&self, _user_id: &str, _chunk: &str) -> motosan_chat_core::Result<()> {
914        emit_thinking_event(&self.agent_id, _chunk);
915        Ok(())
916    }
917
918    async fn stream_done(&self, _user_id: &str) -> motosan_chat_core::Result<()> {
919        Ok(())
920    }
921
922    async fn send(&self, _user_id: &str, _text: &str) -> motosan_chat_core::Result<()> {
923        Ok(())
924    }
925}
926
927// ---------------------------------------------------------------------------
928// Real Claude API call
929// ---------------------------------------------------------------------------
930
931#[derive(Debug, Clone)]
932struct LlmCallResult {
933    decision: AgentDecision,
934    tool_turn_count: u32,
935}
936
937/// Run motosan-chat-agent AgentLoop and convert output to AgentDecision.
938async fn call_claude_real(
939    agent_id: &str,
940    system_prompt: &str,
941    user_message: &str,
942    max_retries: u32,
943    max_tool_turns: u32,
944) -> Result<LlmCallResult, String> {
945    let cred_resolver = hyper_agent_core::credentials::CredentialResolver::new(
946        hyper_agent_core::config::CredentialsSection::default(),
947    );
948    let api_key = cred_resolver
949        .anthropic_key()
950        .ok_or_else(|| "No Anthropic API key found (set ANTHROPIC_API_KEY or ANTHROPIC_OAUTH_TOKEN env var, or configure in TOML)".to_string())?;
951
952    let retry_policy = motosan_ai::RetryPolicy::new()
953        .max_retries(max_retries)
954        .base_delay_ms(1_000)
955        .max_delay_ms(30_000)
956        .jitter(true)
957        .respect_retry_after(true);
958
959    let ai_client = motosan_ai::Client::builder()
960        .provider(motosan_ai::Provider::Anthropic)
961        .api_key(api_key)
962        .model(crate::claude::ClaudeModel::Sonnet4.as_str())
963        .retry_policy(retry_policy)
964        .build()
965        .map_err(|e| format!("Claude client build failed: {}", e))?;
966
967    let llm_client = MotosanAiClient::new(ai_client).with_system(system_prompt);
968
969    let risk_config = hyper_risk::risk::get_risk_config_sync();
970    let account_state = AccountState::default();
971
972    // Build a PositionManager for tools that need position data
973    let tool_pm = {
974        let mut p = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
975        p.push("hyper-agent");
976        p.push("positions");
977        let _ = std::fs::create_dir_all(&p);
978        p.push(format!("{}.db", agent_id));
979        Arc::new(
980            PositionManager::new(p.to_str().unwrap_or("positions.db"))
981                .map_err(|e| format!("Failed to open position DB for tools: {}", e))?,
982        )
983    };
984
985    let loop_engine = MotosanAgentLoop::builder()
986        .tool(GetPositionsTool::new(Arc::clone(&tool_pm)))
987        .tool(PlaceOrderTool::new(
988            RiskGuard::new(risk_config.clone()),
989            account_state.clone(),
990        ))
991        .tool(CancelOrderTool::new(
992            RiskGuard::new(risk_config),
993            account_state,
994        ))
995        .tool(DoNothingTool)
996        .tool(SetStopLossTool)
997        .tool(SetTakeProfitTool)
998        .tool(GetOpenOrdersTool::new(true))
999        .tool(GetTradeHistoryTool::new(true))
1000        .max_iterations(max_tool_turns as usize)
1001        .build();
1002
1003    let event = IncomingEvent {
1004        id: uuid::Uuid::new_v4().to_string(),
1005        platform: "hyper-agent".to_string(),
1006        user_id: "agent-loop".to_string(),
1007        channel_id: "agent-loop".to_string(),
1008        text: user_message.to_string(),
1009        reply_token: None,
1010        attachments: vec![],
1011        chat_type: ChatType::DirectMessage,
1012        kind: IncomingEventKind::Message,
1013    };
1014    let thread = Thread::new(
1015        event,
1016        Arc::new(LoopNoopChannel {
1017            agent_id: agent_id.to_string(),
1018        }),
1019    );
1020
1021    let result = loop_engine
1022        .run(&llm_client, &thread, vec![AgentMessage::user(user_message)])
1023        .await
1024        .map_err(|e| format!("AgentLoop failed: {}", e))?;
1025
1026    let tool_calls = result
1027        .tool_calls
1028        .into_iter()
1029        .map(|(tool_name, tool_input)| crate::claude::ToolCall {
1030            tool_name,
1031            tool_input,
1032        })
1033        .collect();
1034
1035    Ok(LlmCallResult {
1036        decision: AgentDecision {
1037            reasoning: result.answer.clone(),
1038            tool_calls,
1039            confidence: extract_confidence_from_text(&result.answer).unwrap_or(0.5),
1040            raw_content: vec![],
1041        },
1042        tool_turn_count: result.iterations.saturating_sub(1) as u32,
1043    })
1044}
1045
1046// ---------------------------------------------------------------------------
1047// Persistence helpers
1048// ---------------------------------------------------------------------------
1049
1050fn loop_config_dir() -> PathBuf {
1051    let mut path = dirs::config_dir().unwrap_or_else(|| PathBuf::from("."));
1052    path.push("hyper-agent");
1053    path.push("agent-loops");
1054    let _ = fs::create_dir_all(&path);
1055    path
1056}
1057
1058fn loop_config_path(agent_id: &str) -> PathBuf {
1059    loop_config_dir().join(format!("{}.json", agent_id))
1060}
1061
1062fn save_loop_config(config: &LoopConfig) {
1063    let path = loop_config_path(&config.agent_id);
1064    if let Ok(json) = serde_json::to_string_pretty(config) {
1065        let _ = fs::write(&path, json);
1066    }
1067}
1068
1069/// Load all persisted loop configs.
1070pub fn load_all_loop_configs() -> Vec<LoopConfig> {
1071    let dir = loop_config_dir();
1072    let entries = match fs::read_dir(&dir) {
1073        Ok(e) => e,
1074        Err(_) => return Vec::new(),
1075    };
1076    let mut configs = Vec::new();
1077    for entry in entries.flatten() {
1078        let path = entry.path();
1079        if path.extension().is_some_and(|ext| ext == "json") {
1080            if let Ok(data) = fs::read_to_string(&path) {
1081                if let Ok(config) = serde_json::from_str::<LoopConfig>(&data) {
1082                    configs.push(config);
1083                }
1084            }
1085        }
1086    }
1087    configs
1088}
1089
1090/// Log directory for agent loop iterations.
1091fn log_dir() -> PathBuf {
1092    let mut path = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
1093    path.push("hyper-agent");
1094    path.push("agent-logs");
1095    let _ = fs::create_dir_all(&path);
1096    path
1097}
1098
1099fn log_file_path(agent_id: &str) -> PathBuf {
1100    log_dir().join(format!("{}.jsonl", agent_id))
1101}
1102
1103/// Append a log entry to the agent's JSONL log file and persist to SQLite.
1104pub fn write_log_entry(log: &AgentLoopLog) {
1105    // JSONL file log (existing behaviour)
1106    let path = log_file_path(&log.agent_id);
1107    if let Ok(json) = serde_json::to_string(log) {
1108        if let Ok(mut file) = fs::OpenOptions::new().create(true).append(true).open(&path) {
1109            let _ = writeln!(file, "{}", json);
1110        }
1111    }
1112    // SQLite history store
1113    persist_history(log);
1114}
1115
1116// ---------------------------------------------------------------------------
1117// Tests
1118// ---------------------------------------------------------------------------
1119
1120#[cfg(test)]
1121mod tests {
1122    use super::*;
1123
1124    #[test]
1125    fn test_loop_state_serialization() {
1126        let state = LoopState::Running;
1127        let json = serde_json::to_string(&state).unwrap();
1128        assert_eq!(json, "\"running\"");
1129
1130        let state = LoopState::Paused;
1131        let json = serde_json::to_string(&state).unwrap();
1132        assert_eq!(json, "\"paused\"");
1133
1134        let state = LoopState::Stopped;
1135        let json = serde_json::to_string(&state).unwrap();
1136        assert_eq!(json, "\"stopped\"");
1137
1138        let state = LoopState::Error;
1139        let json = serde_json::to_string(&state).unwrap();
1140        assert_eq!(json, "\"error\"");
1141    }
1142
1143    #[test]
1144    fn test_loop_state_deserialization() {
1145        let state: LoopState = serde_json::from_str("\"running\"").unwrap();
1146        assert_eq!(state, LoopState::Running);
1147
1148        let state: LoopState = serde_json::from_str("\"paused\"").unwrap();
1149        assert_eq!(state, LoopState::Paused);
1150
1151        let state: LoopState = serde_json::from_str("\"stopped\"").unwrap();
1152        assert_eq!(state, LoopState::Stopped);
1153    }
1154
1155    #[test]
1156    fn test_loop_status_serialization() {
1157        let status = LoopStatus {
1158            agent_id: "agent-1".to_string(),
1159            state: LoopState::Running,
1160            interval_minutes: 15,
1161            iteration_count: 5,
1162            last_iteration_at: Some("2026-03-09T10:00:00Z".to_string()),
1163            last_error: None,
1164        };
1165        let json = serde_json::to_value(&status).unwrap();
1166        assert_eq!(json["agentId"], "agent-1");
1167        assert_eq!(json["state"], "running");
1168        assert_eq!(json["intervalMinutes"], 15);
1169        assert_eq!(json["iterationCount"], 5);
1170    }
1171
1172    #[test]
1173    fn test_loop_config_serialization() {
1174        let config = LoopConfig {
1175            agent_id: "agent-1".to_string(),
1176            interval_minutes: 15,
1177            auto_start: true,
1178        };
1179        let json = serde_json::to_value(&config).unwrap();
1180        assert_eq!(json["agentId"], "agent-1");
1181        assert_eq!(json["intervalMinutes"], 15);
1182        assert_eq!(json["autoStart"], true);
1183    }
1184
1185    #[test]
1186    fn test_agent_loop_log_serialization() {
1187        let log = AgentLoopLog {
1188            agent_id: "agent-1".to_string(),
1189            iteration: 3,
1190            timestamp: "2026-03-09T10:30:00Z".to_string(),
1191            phase: "complete".to_string(),
1192            success: true,
1193            message: "All good".to_string(),
1194            decision_summary: Some("Buy BTC".to_string()),
1195            tool_turn_count: 2,
1196            risk_blocked: false,
1197            order_executed: true,
1198        };
1199        let json = serde_json::to_value(&log).unwrap();
1200        assert_eq!(json["agentId"], "agent-1");
1201        assert_eq!(json["iteration"], 3);
1202        assert_eq!(json["phase"], "complete");
1203        assert_eq!(json["success"], true);
1204        assert_eq!(json["toolTurnCount"], 2);
1205        assert_eq!(json["riskBlocked"], false);
1206        assert_eq!(json["orderExecuted"], true);
1207    }
1208
1209    #[test]
1210    fn test_agent_loop_manager_creation() {
1211        let _manager = AgentLoopManager::new();
1212    }
1213
1214    #[tokio::test]
1215    async fn test_start_and_stop_loop() {
1216        let manager = AgentLoopManager::new();
1217        let result = manager.start_loop("test-agent".to_string(), 1, None).await;
1218        assert!(result.is_ok());
1219
1220        let status = manager.get_status("test-agent").await.unwrap();
1221        assert_eq!(status.state, LoopState::Running);
1222        assert_eq!(status.interval_minutes, 1);
1223
1224        let result = manager.stop_loop("test-agent").await;
1225        assert!(result.is_ok());
1226    }
1227
1228    #[tokio::test]
1229    async fn test_start_duplicate_loop_fails() {
1230        let manager = AgentLoopManager::new();
1231        manager
1232            .start_loop("dup-agent".to_string(), 1, None)
1233            .await
1234            .unwrap();
1235
1236        let result = manager.start_loop("dup-agent".to_string(), 1, None).await;
1237        assert!(result.is_err());
1238        assert!(result.unwrap_err().contains("already"));
1239
1240        manager.stop_loop("dup-agent").await.unwrap();
1241    }
1242
1243    #[tokio::test]
1244    async fn test_stop_nonexistent_loop_fails() {
1245        let manager = AgentLoopManager::new();
1246        let result = manager.stop_loop("nonexistent").await;
1247        assert!(result.is_err());
1248    }
1249
1250    #[tokio::test]
1251    async fn test_pause_and_resume_loop() {
1252        let manager = AgentLoopManager::new();
1253        manager
1254            .start_loop("pr-agent".to_string(), 60, None)
1255            .await
1256            .unwrap();
1257
1258        let result = manager.pause_loop("pr-agent").await;
1259        assert!(result.is_ok());
1260
1261        let status = manager.get_status("pr-agent").await.unwrap();
1262        assert_eq!(status.state, LoopState::Paused);
1263
1264        let result = manager.resume_loop("pr-agent").await;
1265        assert!(result.is_ok());
1266
1267        let status = manager.get_status("pr-agent").await.unwrap();
1268        assert_eq!(status.state, LoopState::Running);
1269
1270        manager.stop_loop("pr-agent").await.unwrap();
1271    }
1272
1273    #[tokio::test]
1274    async fn test_get_status_nonexistent_returns_stopped() {
1275        let manager = AgentLoopManager::new();
1276        let status = manager.get_status("no-such-agent").await.unwrap();
1277        assert_eq!(status.state, LoopState::Stopped);
1278    }
1279
1280    #[test]
1281    fn test_parse_composer_profile() {
1282        assert!(matches!(
1283            parse_composer_profile("conservative"),
1284            Some(ComposerProfile::Conservative)
1285        ));
1286        assert!(matches!(
1287            parse_composer_profile("all_weather"),
1288            Some(ComposerProfile::AllWeather)
1289        ));
1290        assert!(matches!(
1291            parse_composer_profile("turtle_system"),
1292            Some(ComposerProfile::TurtleSystem)
1293        ));
1294        assert!(matches!(
1295            parse_composer_profile("turtle"),
1296            Some(ComposerProfile::TurtleSystem)
1297        ));
1298        assert!(parse_composer_profile("unknown").is_none());
1299    }
1300}