1use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9
10use super::style_tracker::CommunicationStyleTracker;
11
12#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct CdcState {
15 pub cursors: HashMap<String, String>,
17 pub sent_messages: HashMap<String, Vec<SentMessageRecord>>,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct SentMessageRecord {
25 pub message_id: String,
26 pub channel: String,
27 pub timestamp: chrono::DateTime<chrono::Utc>,
28}
29
30impl CdcState {
31 pub fn load(workspace: &Path) -> Self {
33 let path = workspace.join(".rustant").join("cdc").join("state.json");
34 if path.exists() {
35 std::fs::read_to_string(&path)
36 .ok()
37 .and_then(|s| serde_json::from_str(&s).ok())
38 .unwrap_or_default()
39 } else {
40 Self::default()
41 }
42 }
43
44 pub fn save(&self, workspace: &Path) -> Result<(), String> {
46 let dir = workspace.join(".rustant").join("cdc");
47 std::fs::create_dir_all(&dir).map_err(|e| format!("Create CDC dir: {}", e))?;
48 let path = dir.join("state.json");
49 let tmp = path.with_extension("json.tmp");
50 let json = serde_json::to_string_pretty(self)
51 .map_err(|e| format!("Serialize CDC state: {}", e))?;
52 std::fs::write(&tmp, &json).map_err(|e| format!("Write CDC state: {}", e))?;
53 std::fs::rename(&tmp, &path).map_err(|e| format!("Rename CDC state: {}", e))?;
54 Ok(())
55 }
56
57 pub fn cursor_for(&self, channel: &str) -> Option<&str> {
59 self.cursors.get(channel).map(|s| s.as_str())
60 }
61
62 pub fn set_cursor(&mut self, channel: &str, cursor: String) {
64 self.cursors.insert(channel.to_string(), cursor);
65 }
66
67 pub fn record_sent(&mut self, channel: &str, message_id: &str) {
69 let records = self.sent_messages.entry(channel.to_string()).or_default();
70 records.push(SentMessageRecord {
71 message_id: message_id.to_string(),
72 channel: channel.to_string(),
73 timestamp: chrono::Utc::now(),
74 });
75 }
76
77 pub fn is_reply_to_us(&self, channel: &str, reply_to: &str) -> bool {
79 self.sent_messages
80 .get(channel)
81 .map(|records| records.iter().any(|r| r.message_id == reply_to))
82 .unwrap_or(false)
83 }
84
85 pub fn expire_sent_records(&mut self, ttl_days: u64) {
87 let cutoff = chrono::Utc::now() - chrono::Duration::days(ttl_days as i64);
88 for records in self.sent_messages.values_mut() {
89 records.retain(|r| r.timestamp > cutoff);
90 }
91 self.sent_messages.retain(|_, v| !v.is_empty());
93 }
94}
95
96#[derive(Debug, Clone)]
98pub enum CdcAction {
99 Reply {
101 channel: String,
102 text: String,
103 reply_to: Option<String>,
104 },
105 Escalate {
107 channel: String,
108 sender: String,
109 summary: String,
110 },
111 AddToDigest {
113 channel: String,
114 sender: String,
115 preview: String,
116 },
117 StatusUpdate(String),
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct CdcConfig {
124 pub enabled: bool,
126 pub default_interval_secs: u64,
128 #[serde(default)]
130 pub channel_intervals: HashMap<String, u64>,
131 #[serde(default)]
133 pub channel_enabled: HashMap<String, bool>,
134 pub sent_record_ttl_days: u64,
136 pub style_fact_threshold: usize,
138}
139
140impl Default for CdcConfig {
141 fn default() -> Self {
142 Self {
143 enabled: true,
144 default_interval_secs: 60,
145 channel_intervals: HashMap::new(),
146 channel_enabled: HashMap::new(),
147 sent_record_ttl_days: 7,
148 style_fact_threshold: 50,
149 }
150 }
151}
152
153impl CdcConfig {
154 pub fn interval_for(&self, channel: &str) -> u64 {
156 self.channel_intervals
157 .get(channel)
158 .copied()
159 .unwrap_or(self.default_interval_secs)
160 }
161
162 pub fn is_channel_enabled(&self, channel: &str) -> bool {
164 self.channel_enabled.get(channel).copied().unwrap_or(true) }
166}
167
168pub struct CdcProcessor {
170 pub config: CdcConfig,
171 pub state: CdcState,
172 pub style_tracker: CommunicationStyleTracker,
173 workspace: PathBuf,
174}
175
176impl CdcProcessor {
177 pub fn new(config: CdcConfig, workspace: PathBuf) -> Self {
179 let state = CdcState::load(&workspace);
180 let style_tracker = CommunicationStyleTracker::new(config.style_fact_threshold);
181 Self {
182 config,
183 state,
184 style_tracker,
185 workspace,
186 }
187 }
188
189 pub fn process_messages(
193 &mut self,
194 channel: &str,
195 messages: &[(String, String, String, Option<String>)], ) -> (Vec<CdcAction>, Vec<String>) {
197 let mut actions = Vec::new();
198 let mut facts = Vec::new();
199
200 for (msg_id, sender, text, reply_to) in messages {
201 let style_facts = self.style_tracker.track_message(sender, channel, text);
203 facts.extend(style_facts);
204
205 let is_reply_to_us = reply_to
207 .as_ref()
208 .map(|rt| self.state.is_reply_to_us(channel, rt))
209 .unwrap_or(false);
210
211 if is_reply_to_us {
213 actions.push(CdcAction::Escalate {
215 channel: channel.to_string(),
216 sender: sender.clone(),
217 summary: truncate(text, 100),
218 });
219 } else if looks_like_question(text) {
220 actions.push(CdcAction::Reply {
222 channel: channel.to_string(),
223 text: "Received your question. Processing...".to_string(),
224 reply_to: Some(msg_id.clone()),
225 });
226 } else {
227 actions.push(CdcAction::AddToDigest {
229 channel: channel.to_string(),
230 sender: sender.clone(),
231 preview: truncate(text, 80),
232 });
233 }
234 }
235
236 if let Some((last_id, _, _, _)) = messages.last() {
238 self.state.set_cursor(channel, last_id.clone());
239 }
240
241 self.state
243 .expire_sent_records(self.config.sent_record_ttl_days);
244
245 if let Err(e) = self.state.save(&self.workspace) {
247 tracing::warn!("Failed to save CDC state: {}", e);
248 }
249
250 (actions, facts)
251 }
252
253 pub fn record_sent_message(&mut self, channel: &str, message_id: &str) {
255 self.state.record_sent(channel, message_id);
256 let _ = self.state.save(&self.workspace);
257 }
258
259 pub fn status_summary(&self) -> String {
261 let mut output = String::from("CDC Status:\n");
262 output.push_str(&format!(" Enabled: {}\n", self.config.enabled));
263 output.push_str(&format!(
264 " Default interval: {}s\n",
265 self.config.default_interval_secs
266 ));
267 output.push_str(&format!(
268 " Channels with cursors: {}\n",
269 self.state.cursors.len()
270 ));
271 for (ch, cursor) in &self.state.cursors {
272 output.push_str(&format!(" {} -> {}\n", ch, cursor));
273 }
274 output.push_str(&format!(
275 " Style profiles tracked: {}\n",
276 self.style_tracker.profiles.len()
277 ));
278 output.push_str(&format!(
279 " Total messages processed: {}\n",
280 self.style_tracker.total_messages
281 ));
282 output
283 }
284}
285
286fn looks_like_question(text: &str) -> bool {
288 text.trim().ends_with('?')
289 || text.to_lowercase().starts_with("can ")
290 || text.to_lowercase().starts_with("could ")
291 || text.to_lowercase().starts_with("how ")
292 || text.to_lowercase().starts_with("what ")
293 || text.to_lowercase().starts_with("when ")
294 || text.to_lowercase().starts_with("where ")
295 || text.to_lowercase().starts_with("why ")
296 || text.to_lowercase().starts_with("is ")
297 || text.to_lowercase().starts_with("are ")
298 || text.to_lowercase().starts_with("do ")
299 || text.to_lowercase().starts_with("does ")
300}
301
302fn truncate(s: &str, max: usize) -> String {
304 if s.len() <= max {
305 s.to_string()
306 } else {
307 format!("{}...", &s[..max])
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use tempfile::TempDir;
315
316 #[test]
317 fn test_cdc_state_roundtrip() {
318 let dir = TempDir::new().unwrap();
319 let workspace = dir.path().canonicalize().unwrap();
320
321 let mut state = CdcState::default();
322 state.set_cursor("slack", "123.456".into());
323 state.record_sent("slack", "789.012");
324 state.save(&workspace).unwrap();
325
326 let loaded = CdcState::load(&workspace);
327 assert_eq!(loaded.cursor_for("slack"), Some("123.456"));
328 assert!(loaded.is_reply_to_us("slack", "789.012"));
329 }
330
331 #[test]
332 fn test_cdc_config_defaults() {
333 let config = CdcConfig::default();
334 assert!(config.enabled);
335 assert_eq!(config.default_interval_secs, 60);
336 assert_eq!(config.interval_for("slack"), 60);
337 assert!(config.is_channel_enabled("slack"));
338 }
339
340 #[test]
341 fn test_cdc_config_channel_overrides() {
342 let mut config = CdcConfig::default();
343 config.channel_intervals.insert("slack".into(), 120);
344 config.channel_enabled.insert("irc".into(), false);
345
346 assert_eq!(config.interval_for("slack"), 120);
347 assert_eq!(config.interval_for("email"), 60); assert!(!config.is_channel_enabled("irc"));
349 assert!(config.is_channel_enabled("slack"));
350 }
351
352 #[test]
353 fn test_cdc_processor_process_messages() {
354 let dir = TempDir::new().unwrap();
355 let workspace = dir.path().canonicalize().unwrap();
356 let config = CdcConfig {
357 style_fact_threshold: 50,
358 ..Default::default()
359 };
360 let mut processor = CdcProcessor::new(config, workspace);
361
362 let messages = vec![
363 (
364 "1".into(),
365 "user1".into(),
366 "Can you help me with this?".into(),
367 None,
368 ),
369 (
370 "2".into(),
371 "user2".into(),
372 "Just an update on the project".into(),
373 None,
374 ),
375 ];
376
377 let (actions, _facts) = processor.process_messages("slack", &messages);
378 assert_eq!(actions.len(), 2);
379 assert!(matches!(&actions[0], CdcAction::Reply { .. }));
381 assert!(matches!(&actions[1], CdcAction::AddToDigest { .. }));
383 }
384
385 #[test]
386 fn test_reply_chain_detection() {
387 let dir = TempDir::new().unwrap();
388 let workspace = dir.path().canonicalize().unwrap();
389 let config = CdcConfig::default();
390 let mut processor = CdcProcessor::new(config, workspace);
391
392 processor.record_sent_message("slack", "our_msg_123");
394
395 let messages = vec![(
397 "reply_1".into(),
398 "user1".into(),
399 "Thanks for that info!".into(),
400 Some("our_msg_123".into()),
401 )];
402
403 let (actions, _) = processor.process_messages("slack", &messages);
404 assert_eq!(actions.len(), 1);
405 assert!(matches!(&actions[0], CdcAction::Escalate { .. }));
406 }
407
408 #[test]
409 fn test_sent_record_expiry() {
410 let mut state = CdcState::default();
411 state.record_sent("slack", "old_msg");
412
413 if let Some(records) = state.sent_messages.get_mut("slack") {
415 records[0].timestamp = chrono::Utc::now() - chrono::Duration::days(10);
416 }
417
418 state.expire_sent_records(7);
419 assert!(!state.is_reply_to_us("slack", "old_msg"));
420 }
421
422 #[test]
423 fn test_looks_like_question() {
424 assert!(looks_like_question("How do I do this?"));
425 assert!(looks_like_question("Can you help me"));
426 assert!(looks_like_question("What is the status?"));
427 assert!(!looks_like_question("Just an update"));
428 assert!(!looks_like_question("Thanks for the info"));
429 }
430
431 #[test]
432 fn test_status_summary() {
433 let dir = TempDir::new().unwrap();
434 let workspace = dir.path().canonicalize().unwrap();
435 let config = CdcConfig::default();
436 let processor = CdcProcessor::new(config, workspace);
437
438 let summary = processor.status_summary();
439 assert!(summary.contains("CDC Status"));
440 assert!(summary.contains("Enabled: true"));
441 }
442}