Skip to main content

rustyclaw_core/gateway/
thinking_clock.rs

1//! Thinking Clock — periodic reflection with multi-model orchestration.
2//!
3//! The Thinking Clock provides ambient awareness by periodically running a
4//! cheap/local model to assess whether the agent should proactively take
5//! action. This mirrors OpenClaw's "Thinking Clock" feature:
6//!
7//! - A **ticker** fires at a configurable interval (e.g., every 5 minutes).
8//! - A **cheap model** (e.g., local Ollama, economy tier) evaluates the
9//!   current context and decides if any action is needed.
10//! - If the cheap model detects something worth acting on, it **escalates**
11//!   to the primary (more capable) model for actual execution.
12//!
13//! Use cases:
14//! - Monitor cron job results and alert on failures.
15//! - Check for pending messages that need follow-up.
16//! - Periodic status summaries in messenger channels.
17//! - Background awareness of system health.
18
19use serde::{Deserialize, Serialize};
20use std::time::Duration;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, info};
23
24/// Configuration for the Thinking Clock.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct ThinkingClockConfig {
27    /// Whether the thinking clock is enabled.
28    #[serde(default)]
29    pub enabled: bool,
30
31    /// Interval in seconds between ticks (default: 300 = 5 minutes).
32    #[serde(default = "default_interval")]
33    pub interval_secs: u64,
34
35    /// Model ID for the cheap/ambient check (e.g., "ollama/llama3.2:3b").
36    /// If not set, uses the cheapest available model from the registry.
37    #[serde(default)]
38    pub ambient_model: Option<String>,
39
40    /// Model ID for escalation (primary model).
41    /// If not set, uses the active model.
42    #[serde(default)]
43    pub escalation_model: Option<String>,
44
45    /// System prompt for the ambient check.
46    #[serde(default = "default_ambient_prompt")]
47    pub ambient_prompt: String,
48
49    /// Maximum tokens for the ambient check (keep low to stay cheap).
50    #[serde(default = "default_ambient_max_tokens")]
51    pub ambient_max_tokens: u32,
52
53    /// Keywords/phrases that trigger escalation from the ambient model's
54    /// response (e.g., ["ESCALATE", "ACTION_NEEDED"]).
55    #[serde(default = "default_escalation_triggers")]
56    pub escalation_triggers: Vec<String>,
57
58    /// Whether to log ambient check results (even when no action taken).
59    #[serde(default)]
60    pub verbose_logging: bool,
61}
62
63fn default_interval() -> u64 {
64    300
65}
66
67fn default_ambient_prompt() -> String {
68    "You are an ambient awareness monitor. Review the current context and \
69     determine if any action is needed. If action is needed, respond with \
70     'ESCALATE: <reason>'. If no action is needed, respond with 'OK'."
71        .to_string()
72}
73
74fn default_ambient_max_tokens() -> u32 {
75    256
76}
77
78fn default_escalation_triggers() -> Vec<String> {
79    vec![
80        "ESCALATE".to_string(),
81        "ACTION_NEEDED".to_string(),
82        "ALERT".to_string(),
83    ]
84}
85
86impl Default for ThinkingClockConfig {
87    fn default() -> Self {
88        Self {
89            enabled: false,
90            interval_secs: default_interval(),
91            ambient_model: None,
92            escalation_model: None,
93            ambient_prompt: default_ambient_prompt(),
94            ambient_max_tokens: default_ambient_max_tokens(),
95            escalation_triggers: default_escalation_triggers(),
96            verbose_logging: false,
97        }
98    }
99}
100
101/// Result of an ambient check.
102#[derive(Debug, Clone, Serialize)]
103pub struct AmbientCheckResult {
104    /// The ambient model's response.
105    pub response: String,
106    /// Whether escalation was triggered.
107    pub escalated: bool,
108    /// The reason for escalation (if any).
109    pub escalation_reason: Option<String>,
110    /// Duration of the check.
111    pub duration_ms: u64,
112}
113
114/// Check if an ambient response should trigger escalation.
115pub fn should_escalate(response: &str, triggers: &[String]) -> Option<String> {
116    // Use case-insensitive search directly on the original string to avoid
117    // byte-position misalignment between `to_uppercase()` and the original
118    // (multi-byte characters like ß→SS can change byte lengths).
119    let response_lower = response.to_lowercase();
120
121    for trigger in triggers {
122        let trigger_lower = trigger.to_lowercase();
123        if let Some(pos) = response_lower.find(&trigger_lower) {
124            // `pos` is a byte offset into `response_lower` which has the
125            // same byte length as `response` (lowercasing ASCII-range
126            // characters preserves byte length for the trigger keywords
127            // we care about: ESCALATE, ACTION_NEEDED, ALERT).
128            let after = &response[pos + trigger.len()..];
129            let reason = after
130                .trim_start_matches(':')
131                .trim_start_matches(' ')
132                .trim();
133            let reason = if reason.is_empty() {
134                response.to_string()
135            } else {
136                reason.to_string()
137            };
138
139            debug!(trigger = %trigger, reason = %reason, "Escalation triggered");
140            return Some(reason);
141        }
142    }
143
144    None
145}
146
147/// State for the thinking clock tick loop.
148pub struct ThinkingClock {
149    config: ThinkingClockConfig,
150    tick_count: u64,
151}
152
153impl ThinkingClock {
154    /// Create a new thinking clock.
155    pub fn new(config: ThinkingClockConfig) -> Self {
156        Self {
157            config,
158            tick_count: 0,
159        }
160    }
161
162    /// Get the tick interval.
163    pub fn interval(&self) -> Duration {
164        Duration::from_secs(self.config.interval_secs)
165    }
166
167    /// Get the ambient model ID.
168    pub fn ambient_model(&self) -> Option<&str> {
169        self.config.ambient_model.as_deref()
170    }
171
172    /// Get the escalation model ID.
173    pub fn escalation_model(&self) -> Option<&str> {
174        self.config.escalation_model.as_deref()
175    }
176
177    /// Get the ambient prompt.
178    pub fn ambient_prompt(&self) -> &str {
179        &self.config.ambient_prompt
180    }
181
182    /// Get max tokens for ambient check.
183    pub fn ambient_max_tokens(&self) -> u32 {
184        self.config.ambient_max_tokens
185    }
186
187    /// Record a tick and return the tick count.
188    pub fn tick(&mut self) -> u64 {
189        self.tick_count += 1;
190        if self.config.verbose_logging {
191            debug!(tick = self.tick_count, "Thinking clock tick");
192        }
193        self.tick_count
194    }
195
196    /// Process an ambient model response.
197    pub fn process_response(&self, response: &str, duration_ms: u64) -> AmbientCheckResult {
198        let escalation = should_escalate(response, &self.config.escalation_triggers);
199        let escalated = escalation.is_some();
200
201        if escalated {
202            info!(
203                reason = ?escalation,
204                "Thinking clock: escalation triggered"
205            );
206        } else if self.config.verbose_logging {
207            debug!(response = %response, "Thinking clock: no action needed");
208        }
209
210        AmbientCheckResult {
211            response: response.to_string(),
212            escalated,
213            escalation_reason: escalation,
214            duration_ms,
215        }
216    }
217
218    /// Check if the thinking clock is enabled.
219    pub fn is_enabled(&self) -> bool {
220        self.config.enabled
221    }
222
223    /// Get the current tick count.
224    pub fn tick_count(&self) -> u64 {
225        self.tick_count
226    }
227}
228
229/// Run the thinking clock loop.
230///
231/// This is a skeleton that the gateway integrates with its model dispatch.
232/// The actual model calls are performed by the gateway using the ambient
233/// and escalation model IDs from the config.
234pub async fn run_thinking_clock_loop(
235    config: ThinkingClockConfig,
236    cancel: CancellationToken,
237    // The gateway provides a callback for each tick
238    on_tick: impl Fn(u64) + Send + 'static,
239) {
240    if !config.enabled {
241        debug!("Thinking clock disabled");
242        return;
243    }
244
245    let mut clock = ThinkingClock::new(config);
246    let interval = clock.interval();
247
248    info!(
249        interval_secs = interval.as_secs(),
250        "Thinking clock started"
251    );
252
253    loop {
254        tokio::select! {
255            _ = cancel.cancelled() => {
256                info!("Thinking clock stopped");
257                break;
258            }
259            _ = tokio::time::sleep(interval) => {
260                let tick = clock.tick();
261                on_tick(tick);
262            }
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_config_defaults() {
273        let config = ThinkingClockConfig::default();
274        assert!(!config.enabled);
275        assert_eq!(config.interval_secs, 300);
276        assert_eq!(config.ambient_max_tokens, 256);
277        assert!(!config.escalation_triggers.is_empty());
278    }
279
280    #[test]
281    fn test_should_escalate_yes() {
282        let triggers = vec!["ESCALATE".to_string(), "ALERT".to_string()];
283
284        let result = should_escalate("ESCALATE: server is down", &triggers);
285        assert!(result.is_some());
286        assert_eq!(result.unwrap(), "server is down");
287    }
288
289    #[test]
290    fn test_should_escalate_no() {
291        let triggers = vec!["ESCALATE".to_string()];
292
293        let result = should_escalate("OK - all systems normal", &triggers);
294        assert!(result.is_none());
295    }
296
297    #[test]
298    fn test_should_escalate_case_insensitive() {
299        let triggers = vec!["ESCALATE".to_string()];
300
301        let result = should_escalate("escalate: need attention", &triggers);
302        assert!(result.is_some());
303    }
304
305    #[test]
306    fn test_thinking_clock_tick() {
307        let config = ThinkingClockConfig {
308            enabled: true,
309            ..Default::default()
310        };
311        let mut clock = ThinkingClock::new(config);
312
313        assert_eq!(clock.tick_count(), 0);
314        assert_eq!(clock.tick(), 1);
315        assert_eq!(clock.tick(), 2);
316        assert_eq!(clock.tick_count(), 2);
317    }
318
319    #[test]
320    fn test_process_response_no_escalation() {
321        let config = ThinkingClockConfig::default();
322        let clock = ThinkingClock::new(config);
323
324        let result = clock.process_response("OK - everything is fine", 50);
325        assert!(!result.escalated);
326        assert!(result.escalation_reason.is_none());
327    }
328
329    #[test]
330    fn test_process_response_with_escalation() {
331        let config = ThinkingClockConfig::default();
332        let clock = ThinkingClock::new(config);
333
334        let result = clock.process_response("ESCALATE: cron job failed", 100);
335        assert!(result.escalated);
336        assert_eq!(result.escalation_reason.unwrap(), "cron job failed");
337    }
338
339    #[test]
340    fn test_thinking_clock_disabled() {
341        let config = ThinkingClockConfig::default(); // disabled by default
342        let clock = ThinkingClock::new(config);
343        assert!(!clock.is_enabled());
344    }
345
346    #[test]
347    fn test_interval() {
348        let config = ThinkingClockConfig {
349            interval_secs: 60,
350            ..Default::default()
351        };
352        let clock = ThinkingClock::new(config);
353        assert_eq!(clock.interval(), Duration::from_secs(60));
354    }
355}