Skip to main content

rustant_core/channels/
cdc.rs

1//! Change Data Capture (CDC) for channel message processing.
2//!
3//! Provides stateful polling with cursor-based tracking, reply-chain detection,
4//! and a background polling loop that feeds the classification -> auto-reply pipeline.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use super::style_tracker::CommunicationStyleTracker;
11
12/// Per-channel cursor state for tracking which messages have been processed.
13#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct CdcState {
15    /// Per-channel cursors (channel_name -> cursor string).
16    pub cursors: HashMap<String, String>,
17    /// Message IDs we've sent (for reply-chain detection).
18    /// Maps channel -> Vec<SentMessageRecord>.
19    pub sent_messages: HashMap<String, Vec<SentMessageRecord>>,
20}
21
22/// Record of a message sent by the agent.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct SentMessageRecord {
25    pub message_id: String,
26    pub channel: String,
27    pub timestamp: chrono::DateTime<chrono::Utc>,
28}
29
30impl CdcState {
31    /// Load state from disk.
32    pub fn load(workspace: &Path) -> Self {
33        let path = workspace.join(".rustant").join("cdc").join("state.json");
34        if path.exists() {
35            std::fs::read_to_string(&path)
36                .ok()
37                .and_then(|s| serde_json::from_str(&s).ok())
38                .unwrap_or_default()
39        } else {
40            Self::default()
41        }
42    }
43
44    /// Persist state to disk (atomic write).
45    pub fn save(&self, workspace: &Path) -> Result<(), String> {
46        let dir = workspace.join(".rustant").join("cdc");
47        std::fs::create_dir_all(&dir).map_err(|e| format!("Create CDC dir: {}", e))?;
48        let path = dir.join("state.json");
49        let tmp = path.with_extension("json.tmp");
50        let json = serde_json::to_string_pretty(self)
51            .map_err(|e| format!("Serialize CDC state: {}", e))?;
52        std::fs::write(&tmp, &json).map_err(|e| format!("Write CDC state: {}", e))?;
53        std::fs::rename(&tmp, &path).map_err(|e| format!("Rename CDC state: {}", e))?;
54        Ok(())
55    }
56
57    /// Get the cursor for a specific channel.
58    pub fn cursor_for(&self, channel: &str) -> Option<&str> {
59        self.cursors.get(channel).map(|s| s.as_str())
60    }
61
62    /// Update the cursor for a channel.
63    pub fn set_cursor(&mut self, channel: &str, cursor: String) {
64        self.cursors.insert(channel.to_string(), cursor);
65    }
66
67    /// Record a sent message for reply-chain detection.
68    pub fn record_sent(&mut self, channel: &str, message_id: &str) {
69        let records = self.sent_messages.entry(channel.to_string()).or_default();
70        records.push(SentMessageRecord {
71            message_id: message_id.to_string(),
72            channel: channel.to_string(),
73            timestamp: chrono::Utc::now(),
74        });
75    }
76
77    /// Check if a message is a reply to one of our sent messages.
78    pub fn is_reply_to_us(&self, channel: &str, reply_to: &str) -> bool {
79        self.sent_messages
80            .get(channel)
81            .map(|records| records.iter().any(|r| r.message_id == reply_to))
82            .unwrap_or(false)
83    }
84
85    /// Expire sent message records older than `ttl_days`.
86    pub fn expire_sent_records(&mut self, ttl_days: u64) {
87        let cutoff = chrono::Utc::now() - chrono::Duration::days(ttl_days as i64);
88        for records in self.sent_messages.values_mut() {
89            records.retain(|r| r.timestamp > cutoff);
90        }
91        // Remove empty channels
92        self.sent_messages.retain(|_, v| !v.is_empty());
93    }
94}
95
96/// Action emitted by the CDC processor for the REPL/TUI to handle.
97#[derive(Debug, Clone)]
98pub enum CdcAction {
99    /// Auto-reply ready to send (channel, message_text, reply_to_id).
100    Reply {
101        channel: String,
102        text: String,
103        reply_to: Option<String>,
104    },
105    /// Message requires user attention (escalation).
106    Escalate {
107        channel: String,
108        sender: String,
109        summary: String,
110    },
111    /// Message added to digest for later review.
112    AddToDigest {
113        channel: String,
114        sender: String,
115        preview: String,
116    },
117    /// Status update for display.
118    StatusUpdate(String),
119}
120
121/// Configuration for the CDC polling system.
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct CdcConfig {
124    /// Whether CDC polling is enabled.
125    pub enabled: bool,
126    /// Default polling interval in seconds.
127    pub default_interval_secs: u64,
128    /// Per-channel polling interval overrides.
129    #[serde(default)]
130    pub channel_intervals: HashMap<String, u64>,
131    /// Per-channel enable/disable.
132    #[serde(default)]
133    pub channel_enabled: HashMap<String, bool>,
134    /// How long to keep sent message records (days).
135    pub sent_record_ttl_days: u64,
136    /// Number of messages before generating style facts.
137    pub style_fact_threshold: usize,
138}
139
140impl Default for CdcConfig {
141    fn default() -> Self {
142        Self {
143            enabled: true,
144            default_interval_secs: 60,
145            channel_intervals: HashMap::new(),
146            channel_enabled: HashMap::new(),
147            sent_record_ttl_days: 7,
148            style_fact_threshold: 50,
149        }
150    }
151}
152
153impl CdcConfig {
154    /// Get the polling interval for a specific channel.
155    pub fn interval_for(&self, channel: &str) -> u64 {
156        self.channel_intervals
157            .get(channel)
158            .copied()
159            .unwrap_or(self.default_interval_secs)
160    }
161
162    /// Check if a specific channel is enabled for CDC.
163    pub fn is_channel_enabled(&self, channel: &str) -> bool {
164        self.channel_enabled.get(channel).copied().unwrap_or(true) // enabled by default
165    }
166}
167
168/// The CDC processor that coordinates polling, classification, and action emission.
169pub struct CdcProcessor {
170    pub config: CdcConfig,
171    pub state: CdcState,
172    pub style_tracker: CommunicationStyleTracker,
173    workspace: PathBuf,
174}
175
176impl CdcProcessor {
177    /// Create a new CDC processor.
178    pub fn new(config: CdcConfig, workspace: PathBuf) -> Self {
179        let state = CdcState::load(&workspace);
180        let style_tracker = CommunicationStyleTracker::new(config.style_fact_threshold);
181        Self {
182            config,
183            state,
184            style_tracker,
185            workspace,
186        }
187    }
188
189    /// Process a batch of new messages from a channel.
190    ///
191    /// Returns CDC actions and any style facts generated.
192    pub fn process_messages(
193        &mut self,
194        channel: &str,
195        messages: &[(String, String, String, Option<String>)], // (id, sender, text, reply_to)
196    ) -> (Vec<CdcAction>, Vec<String>) {
197        let mut actions = Vec::new();
198        let mut facts = Vec::new();
199
200        for (msg_id, sender, text, reply_to) in messages {
201            // Track communication style
202            let style_facts = self.style_tracker.track_message(sender, channel, text);
203            facts.extend(style_facts);
204
205            // Check if this is a reply to one of our messages
206            let is_reply_to_us = reply_to
207                .as_ref()
208                .map(|rt| self.state.is_reply_to_us(channel, rt))
209                .unwrap_or(false);
210
211            // Simple heuristic classification
212            if is_reply_to_us {
213                // Replies to us get escalated for attention
214                actions.push(CdcAction::Escalate {
215                    channel: channel.to_string(),
216                    sender: sender.clone(),
217                    summary: truncate(text, 100),
218                });
219            } else if looks_like_question(text) {
220                // Questions might need auto-reply
221                actions.push(CdcAction::Reply {
222                    channel: channel.to_string(),
223                    text: "Received your question. Processing...".to_string(),
224                    reply_to: Some(msg_id.clone()),
225                });
226            } else {
227                // Other messages go to digest
228                actions.push(CdcAction::AddToDigest {
229                    channel: channel.to_string(),
230                    sender: sender.clone(),
231                    preview: truncate(text, 80),
232                });
233            }
234        }
235
236        // Update cursor to the last message ID
237        if let Some((last_id, _, _, _)) = messages.last() {
238            self.state.set_cursor(channel, last_id.clone());
239        }
240
241        // Expire old sent records
242        self.state
243            .expire_sent_records(self.config.sent_record_ttl_days);
244
245        // Persist state
246        if let Err(e) = self.state.save(&self.workspace) {
247            tracing::warn!("Failed to save CDC state: {}", e);
248        }
249
250        (actions, facts)
251    }
252
253    /// Record that we sent a message (for reply-chain detection).
254    pub fn record_sent_message(&mut self, channel: &str, message_id: &str) {
255        self.state.record_sent(channel, message_id);
256        let _ = self.state.save(&self.workspace);
257    }
258
259    /// Get the current CDC state summary for display.
260    pub fn status_summary(&self) -> String {
261        let mut output = String::from("CDC Status:\n");
262        output.push_str(&format!("  Enabled: {}\n", self.config.enabled));
263        output.push_str(&format!(
264            "  Default interval: {}s\n",
265            self.config.default_interval_secs
266        ));
267        output.push_str(&format!(
268            "  Channels with cursors: {}\n",
269            self.state.cursors.len()
270        ));
271        for (ch, cursor) in &self.state.cursors {
272            output.push_str(&format!("    {} -> {}\n", ch, cursor));
273        }
274        output.push_str(&format!(
275            "  Style profiles tracked: {}\n",
276            self.style_tracker.profiles.len()
277        ));
278        output.push_str(&format!(
279            "  Total messages processed: {}\n",
280            self.style_tracker.total_messages
281        ));
282        output
283    }
284}
285
286/// Simple heuristic: does this message look like a question?
287fn looks_like_question(text: &str) -> bool {
288    text.trim().ends_with('?')
289        || text.to_lowercase().starts_with("can ")
290        || text.to_lowercase().starts_with("could ")
291        || text.to_lowercase().starts_with("how ")
292        || text.to_lowercase().starts_with("what ")
293        || text.to_lowercase().starts_with("when ")
294        || text.to_lowercase().starts_with("where ")
295        || text.to_lowercase().starts_with("why ")
296        || text.to_lowercase().starts_with("is ")
297        || text.to_lowercase().starts_with("are ")
298        || text.to_lowercase().starts_with("do ")
299        || text.to_lowercase().starts_with("does ")
300}
301
302/// Truncate a string to max length with "...".
303fn truncate(s: &str, max: usize) -> String {
304    if s.len() <= max {
305        s.to_string()
306    } else {
307        format!("{}...", &s[..max])
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use tempfile::TempDir;
315
316    #[test]
317    fn test_cdc_state_roundtrip() {
318        let dir = TempDir::new().unwrap();
319        let workspace = dir.path().canonicalize().unwrap();
320
321        let mut state = CdcState::default();
322        state.set_cursor("slack", "123.456".into());
323        state.record_sent("slack", "789.012");
324        state.save(&workspace).unwrap();
325
326        let loaded = CdcState::load(&workspace);
327        assert_eq!(loaded.cursor_for("slack"), Some("123.456"));
328        assert!(loaded.is_reply_to_us("slack", "789.012"));
329    }
330
331    #[test]
332    fn test_cdc_config_defaults() {
333        let config = CdcConfig::default();
334        assert!(config.enabled);
335        assert_eq!(config.default_interval_secs, 60);
336        assert_eq!(config.interval_for("slack"), 60);
337        assert!(config.is_channel_enabled("slack"));
338    }
339
340    #[test]
341    fn test_cdc_config_channel_overrides() {
342        let mut config = CdcConfig::default();
343        config.channel_intervals.insert("slack".into(), 120);
344        config.channel_enabled.insert("irc".into(), false);
345
346        assert_eq!(config.interval_for("slack"), 120);
347        assert_eq!(config.interval_for("email"), 60); // default
348        assert!(!config.is_channel_enabled("irc"));
349        assert!(config.is_channel_enabled("slack"));
350    }
351
352    #[test]
353    fn test_cdc_processor_process_messages() {
354        let dir = TempDir::new().unwrap();
355        let workspace = dir.path().canonicalize().unwrap();
356        let config = CdcConfig {
357            style_fact_threshold: 50,
358            ..Default::default()
359        };
360        let mut processor = CdcProcessor::new(config, workspace);
361
362        let messages = vec![
363            (
364                "1".into(),
365                "user1".into(),
366                "Can you help me with this?".into(),
367                None,
368            ),
369            (
370                "2".into(),
371                "user2".into(),
372                "Just an update on the project".into(),
373                None,
374            ),
375        ];
376
377        let (actions, _facts) = processor.process_messages("slack", &messages);
378        assert_eq!(actions.len(), 2);
379        // First is a question -> Reply
380        assert!(matches!(&actions[0], CdcAction::Reply { .. }));
381        // Second is not a question -> AddToDigest
382        assert!(matches!(&actions[1], CdcAction::AddToDigest { .. }));
383    }
384
385    #[test]
386    fn test_reply_chain_detection() {
387        let dir = TempDir::new().unwrap();
388        let workspace = dir.path().canonicalize().unwrap();
389        let config = CdcConfig::default();
390        let mut processor = CdcProcessor::new(config, workspace);
391
392        // Record that we sent a message
393        processor.record_sent_message("slack", "our_msg_123");
394
395        // Process a reply to our message
396        let messages = vec![(
397            "reply_1".into(),
398            "user1".into(),
399            "Thanks for that info!".into(),
400            Some("our_msg_123".into()),
401        )];
402
403        let (actions, _) = processor.process_messages("slack", &messages);
404        assert_eq!(actions.len(), 1);
405        assert!(matches!(&actions[0], CdcAction::Escalate { .. }));
406    }
407
408    #[test]
409    fn test_sent_record_expiry() {
410        let mut state = CdcState::default();
411        state.record_sent("slack", "old_msg");
412
413        // Manually set timestamp to 10 days ago
414        if let Some(records) = state.sent_messages.get_mut("slack") {
415            records[0].timestamp = chrono::Utc::now() - chrono::Duration::days(10);
416        }
417
418        state.expire_sent_records(7);
419        assert!(!state.is_reply_to_us("slack", "old_msg"));
420    }
421
422    #[test]
423    fn test_looks_like_question() {
424        assert!(looks_like_question("How do I do this?"));
425        assert!(looks_like_question("Can you help me"));
426        assert!(looks_like_question("What is the status?"));
427        assert!(!looks_like_question("Just an update"));
428        assert!(!looks_like_question("Thanks for the info"));
429    }
430
431    #[test]
432    fn test_status_summary() {
433        let dir = TempDir::new().unwrap();
434        let workspace = dir.path().canonicalize().unwrap();
435        let config = CdcConfig::default();
436        let processor = CdcProcessor::new(config, workspace);
437
438        let summary = processor.status_summary();
439        assert!(summary.contains("CDC Status"));
440        assert!(summary.contains("Enabled: true"));
441    }
442}