Skip to main content

crabtalk_runtime/engine/
conversation.rs

1//! Conversation management — lifecycle, persistence, and title generation.
2
3use super::{ConvSlot, Runtime, TopicRouter};
4use crate::{Config, Conversation, ConversationHandle};
5use anyhow::{Result, bail};
6use crabllm_core::{ChatCompletionRequest, Message, Role};
7use memory::{EntryKind, Op};
8use std::sync::{Arc, atomic::Ordering};
9use tokio::sync::Mutex;
10use wcore::{model::HistoryEntry, storage::Storage};
11
12impl<C: Config> Runtime<C> {
13    pub(super) fn new_slot(id: u64, agent: &str, created_by: &str) -> ConvSlot {
14        ConvSlot {
15            agent: agent.to_owned(),
16            created_by: created_by.to_owned(),
17            inner: Arc::new(Mutex::new(Conversation::new(id))),
18        }
19    }
20
21    /// Get or create a conversation for the given (agent, created_by)
22    /// identity. Routing order:
23    ///
24    /// 1. If `(agent, sender)` has an active topic, return that topic's
25    ///    conversation.
26    /// 2. Otherwise return/create the tmp conversation for this pair —
27    ///    in-memory only, no storage I/O, no resume. Topic-bound chats
28    ///    reach storage via `switch_topic`.
29    pub async fn get_or_create_conversation(&self, agent: &str, created_by: &str) -> Result<u64> {
30        if !self.has_agent(agent).await {
31            bail!("agent '{agent}' not registered");
32        }
33
34        let key = (agent.to_owned(), created_by.to_owned());
35
36        // Read-first: the common case is a hit on an existing tmp or
37        // active-topic conversation. Avoid allocating a `TopicRouter`
38        // until we actually need to insert one.
39        if let Some(id) = self
40            .topics
41            .read()
42            .await
43            .get(&key)
44            .and_then(TopicRouter::active_conversation)
45        {
46            return Ok(id);
47        }
48
49        // Reserve an id under the router write lock; release it before
50        // taking the conversations write lock to keep hold-times short.
51        let id = {
52            let mut topics = self.topics.write().await;
53            let router = topics.entry(key).or_default();
54            if let Some(existing) = router.active_conversation() {
55                return Ok(existing);
56            }
57            let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
58            router.tmp = Some(id);
59            id
60        };
61        let slot = Self::new_slot(id, agent, created_by);
62        self.conversations.write().await.insert(id, slot);
63        Ok(id)
64    }
65
66    /// Load a specific conversation by persistent handle.
67    pub async fn load(&self, handle: ConversationHandle) -> Result<u64> {
68        let storage = self.storage();
69        let snapshot = storage
70            .load_session(&handle)?
71            .ok_or_else(|| anyhow::anyhow!("conversation '{}' not found", handle.as_str()))?;
72        if !self.has_agent(&snapshot.meta.agent).await {
73            bail!("agent '{}' not registered", snapshot.meta.agent);
74        }
75        let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
76        let slot = Self::new_slot(id, &snapshot.meta.agent, &snapshot.meta.created_by);
77        {
78            let mut conversation = slot.inner.lock().await;
79            conversation.history =
80                self.resumed_history(snapshot.archive.as_deref(), snapshot.history);
81            conversation.title = snapshot.meta.title;
82            conversation.uptime_secs = snapshot.meta.uptime_secs;
83            conversation.handle = Some(handle);
84        }
85        self.conversations.write().await.insert(id, slot);
86        Ok(id)
87    }
88
89    pub async fn list_active(&self) -> Vec<wcore::protocol::message::ActiveConversationInfo> {
90        // Snapshot the slot metadata and mutex handles first so the
91        // outer read guard isn't held across per-conversation locks —
92        // otherwise a slow conversation would block readers of the
93        // whole map.
94        let slots: Vec<_> = {
95            let conversations = self.conversations.read().await;
96            conversations
97                .values()
98                .map(|s| (s.agent.clone(), s.created_by.clone(), s.inner.clone()))
99                .collect()
100        };
101        let mut infos = Vec::with_capacity(slots.len());
102        for (agent, sender, mutex) in slots {
103            let c = mutex.lock().await;
104            infos.push(wcore::protocol::message::ActiveConversationInfo {
105                agent,
106                sender,
107                message_count: c.history.len() as u64,
108                alive_secs: c.uptime_secs,
109                title: c.title.clone(),
110            });
111        }
112        infos
113    }
114
115    pub async fn close(&self, id: u64) -> bool {
116        self.steering.write().await.remove(&id);
117        let removed = self.conversations.write().await.remove(&id);
118        if let Some(slot) = &removed {
119            let key = (slot.agent.clone(), slot.created_by.clone());
120            let mut topics = self.topics.write().await;
121            if let Some(router) = topics.get_mut(&key) {
122                if router.tmp == Some(id) {
123                    router.tmp = None;
124                }
125                router.by_title.retain(|_, cid| *cid != id);
126                if router
127                    .active
128                    .as_ref()
129                    .is_some_and(|t| !router.by_title.contains_key(t))
130                {
131                    router.active = None;
132                }
133                if router.tmp.is_none() && router.by_title.is_empty() {
134                    topics.remove(&key);
135                }
136            }
137        }
138        removed.is_some()
139    }
140
141    pub async fn steer(&self, conversation_id: u64, content: String) -> Result<()> {
142        let senders = self.steering.read().await;
143        let tx = senders.get(&conversation_id).ok_or_else(|| {
144            anyhow::anyhow!("no active stream for conversation {conversation_id}")
145        })?;
146        tx.send(Some(content))
147            .map_err(|_| anyhow::anyhow!("steering channel closed"))?;
148        Ok(())
149    }
150
151    pub async fn conversation(&self, id: u64) -> Option<Arc<Mutex<Conversation>>> {
152        self.conversations
153            .read()
154            .await
155            .get(&id)
156            .map(|slot| slot.inner.clone())
157    }
158
159    /// Look up a conversation slot's `(agent, sender, mutex)` triple.
160    /// Returns `None` when the conversation id is not registered — the
161    /// execution paths all need this handshake before locking the
162    /// conversation mutex.
163    pub(crate) async fn acquire_slot(
164        &self,
165        id: u64,
166    ) -> Option<(String, String, Arc<Mutex<Conversation>>)> {
167        self.conversations
168            .read()
169            .await
170            .get(&id)
171            .map(ConvSlot::parts)
172    }
173
174    pub async fn conversations(&self) -> Vec<Arc<Mutex<Conversation>>> {
175        self.conversations
176            .read()
177            .await
178            .values()
179            .map(|slot| slot.inner.clone())
180            .collect()
181    }
182
183    pub async fn conversation_count(&self) -> usize {
184        self.conversations.read().await.len()
185    }
186
187    pub async fn conversation_id(&self, agent: &str, sender: &str) -> Option<u64> {
188        let conversations = self.conversations.read().await;
189        for (id, slot) in conversations.iter() {
190            if slot.agent == agent && slot.created_by == sender {
191                return Some(*id);
192            }
193        }
194        None
195    }
196
197    pub async fn compact(&self, conversation_id: u64) -> Option<String> {
198        // Release the conversations read lock before the per-conversation
199        // mutex await — otherwise readers queue behind a potentially
200        // contended inner lock.
201        let (agent_name, conversation_mutex) = {
202            let conversations = self.conversations.read().await;
203            let slot = conversations.get(&conversation_id)?;
204            (slot.agent.clone(), slot.inner.clone())
205        };
206        let history = {
207            let conversation = conversation_mutex.lock().await;
208            if conversation.history.is_empty() {
209                return None;
210            }
211            conversation.history.clone()
212        };
213        self.resolve_agent(&agent_name)
214            .await?
215            .compact(&history)
216            .await
217    }
218
219    pub async fn transfer_to<C2: Config>(&self, dest: &mut Runtime<C2>) {
220        let conversations = self.conversations.read().await;
221        let dest_conversations = dest.conversations.get_mut();
222        for (id, slot) in conversations.iter() {
223            dest_conversations.insert(*id, slot.clone());
224        }
225        let next = self.next_conversation_id.load(Ordering::Relaxed);
226        dest.next_conversation_id.store(next, Ordering::Relaxed);
227    }
228
229    /// Build the conversation's replay history from storage's post-compact
230    /// messages plus the Archive entry's content. A missing archive entry
231    /// (memory wiped, different machine, etc.) injects a visible placeholder
232    /// so the model can acknowledge the gap instead of silently truncating
233    /// the user's context.
234    pub(super) fn resumed_history(
235        &self,
236        archive: Option<&str>,
237        mut history: Vec<HistoryEntry>,
238    ) -> Vec<HistoryEntry> {
239        let Some(name) = archive else { return history };
240        let content = {
241            let mem = self.memory.read();
242            mem.get(name).map(|e| e.content.clone())
243        };
244        let prefix = content.unwrap_or_else(|| {
245            tracing::warn!("resume: archive '{name}' missing from memory");
246            format!("[archived context unavailable: {name}]")
247        });
248        let mut out = Vec::with_capacity(history.len() + 1);
249        out.push(HistoryEntry::user(prefix));
250        out.append(&mut history);
251        out
252    }
253
254    /// Write a compaction summary to memory as an `Archive` entry,
255    /// named `{topic-slug}-{n}` where `n` is the next free sequence
256    /// number for this topic. Older archives stay searchable via
257    /// `recall`, so a long-running topic's phases don't get
258    /// overwritten. Returns the generated name, or `None` on failure
259    /// — the caller must skip the compact marker so a resume can't
260    /// dangle.
261    fn write_archive(&self, topic: &str, summary: String) -> Option<String> {
262        let slug = wcore::sender_slug(topic);
263        let prefix = format!("{slug}-");
264        let mut mem = self.memory.write();
265        // Scan and insert under the same write lock — two concurrent
266        // compactions can't both pick `seq` and collide.
267        let next_seq = mem
268            .list()
269            .filter(|e| e.kind == EntryKind::Archive && e.name.starts_with(&prefix))
270            .filter_map(|e| {
271                let suffix = &e.name[prefix.len()..];
272                let n: u32 = suffix.parse().ok()?;
273                // Reject non-canonical forms ("02", "+1", etc.) so a
274                // future `{slug}-2` can't collide with a historic
275                // `{slug}-02`.
276                (n.to_string() == suffix).then_some(n)
277            })
278            .max()
279            .unwrap_or(0)
280            + 1;
281        let name = format!("{slug}-{next_seq}");
282        match mem.apply(Op::Add {
283            name: name.clone(),
284            content: summary,
285            aliases: vec![],
286            kind: EntryKind::Archive,
287        }) {
288            Ok(()) => Some(name),
289            Err(e) => {
290                tracing::error!("archive write failed: {e}");
291                None
292            }
293        }
294    }
295
296    /// Ensure a topic-bound conversation has a session handle, creating
297    /// one via the repo if needed. Tmp chats (no topic) are in-memory
298    /// only — never persisted, never assigned a handle.
299    fn ensure_handle(&self, conversation: &mut Conversation, agent: &str, created_by: &str) {
300        if conversation.handle.is_some() || conversation.topic.is_none() {
301            return;
302        }
303        let storage = self.storage();
304        match storage.create_session(agent, created_by) {
305            Ok(handle) => conversation.handle = Some(handle),
306            Err(e) => tracing::warn!("failed to create session: {e}"),
307        }
308    }
309
310    /// Post-run tail shared by `send_to`, `stream_to`, and
311    /// `guest_stream_to`: update uptime, persist, and kick off title
312    /// generation if the conversation has a titleable exchange and no
313    /// title yet.
314    #[allow(clippy::too_many_arguments)]
315    pub(crate) fn finalize_run(
316        &self,
317        conversation_id: u64,
318        conversation: &mut Conversation,
319        conversation_mutex: Arc<Mutex<Conversation>>,
320        agent: &str,
321        created_by: &str,
322        run_start: std::time::Instant,
323        pre_run_len: usize,
324        compact_summary: Option<String>,
325        event_trace: &[wcore::EventLine],
326    ) {
327        conversation.uptime_secs += run_start.elapsed().as_secs();
328        self.persist_messages(
329            conversation,
330            agent,
331            created_by,
332            pre_run_len,
333            compact_summary,
334            event_trace,
335        );
336        if conversation.title.is_empty() && conversation.history.len() >= 2 {
337            self.spawn_title_generation(conversation_id, agent, created_by, conversation_mutex);
338        }
339    }
340
341    /// Persist messages to the session repo. Handles ensure_handle,
342    /// compact markers, and meta updates.
343    pub(crate) fn persist_messages(
344        &self,
345        conversation: &mut Conversation,
346        agent: &str,
347        created_by: &str,
348        pre_run_len: usize,
349        compact_summary: Option<String>,
350        event_trace: &[wcore::EventLine],
351    ) {
352        self.ensure_handle(conversation, agent, created_by);
353        let Some(ref handle) = conversation.handle else {
354            return;
355        };
356        let storage = self.storage();
357
358        if let Some(summary) = compact_summary {
359            // A persisted conversation is always topic-bound —
360            // `ensure_handle` refuses to create a handle for tmp chats.
361            let topic = conversation
362                .topic
363                .clone()
364                .expect("persisted conversation without a topic");
365            // Archive first — if this fails, don't write a dangling
366            // marker that points at nothing.
367            if let Some(archive_name) = self.write_archive(&topic, summary) {
368                let _ = storage.append_session_compact(handle, &archive_name);
369                if conversation.history.len() > 1 {
370                    let tail: Vec<_> = conversation.history[1..]
371                        .iter()
372                        .filter(|e| !e.auto_injected)
373                        .cloned()
374                        .collect();
375                    let _ = storage.append_session_messages(handle, &tail);
376                }
377            }
378        } else {
379            let new_entries: Vec<_> = conversation.history[pre_run_len..]
380                .iter()
381                .filter(|e| !e.auto_injected)
382                .cloned()
383                .collect();
384            let _ = storage.append_session_messages(handle, &new_entries);
385        }
386        if !event_trace.is_empty() {
387            let _ = storage.append_session_events(handle, event_trace);
388        }
389        let _ = storage.update_session_meta(handle, &conversation.meta(agent, created_by));
390    }
391
392    pub(crate) fn spawn_title_generation(
393        &self,
394        _conversation_id: u64,
395        agent_name: &str,
396        created_by: &str,
397        conversation_mutex: Arc<Mutex<Conversation>>,
398    ) {
399        let model = self.model.clone();
400        let storage = self.storage().clone();
401        let agent_name = agent_name.to_owned();
402        let created_by = created_by.to_owned();
403        let model_name = self
404            .agents
405            .read()
406            .get(agent_name.as_str())
407            .map(|a| a.config.model.clone())
408            .unwrap_or_default();
409        if model_name.is_empty() {
410            return;
411        }
412        tokio::spawn(async move {
413            let (user_msg, assistant_msg) = {
414                let conversation = conversation_mutex.lock().await;
415                let user = conversation
416                    .history
417                    .iter()
418                    .find(|e| *e.role() == Role::User && !e.auto_injected)
419                    .map(|e| e.text().to_owned());
420                let assistant = conversation
421                    .history
422                    .iter()
423                    .find(|e| *e.role() == Role::Assistant)
424                    .map(|e| e.text().to_owned());
425                (user, assistant)
426            };
427
428            let Some(user) = user_msg else { return };
429            let Some(assistant) = assistant_msg else {
430                return;
431            };
432
433            let user_snippet: String = user.chars().take(200).collect();
434            let assistant_snippet: String = assistant.chars().take(200).collect();
435
436            let prompt = format!(
437                "Summarize this conversation in 3-6 words as a short title. \
438                 Return ONLY the title, nothing else.\n\n\
439                 User: {user_snippet}\nAssistant: {assistant_snippet}"
440            );
441
442            let request = ChatCompletionRequest {
443                model: model_name,
444                messages: vec![Message::user(&prompt)],
445                temperature: None,
446                top_p: None,
447                max_tokens: None,
448                stream: None,
449                stop: None,
450                tools: None,
451                tool_choice: None,
452                frequency_penalty: None,
453                presence_penalty: None,
454                seed: None,
455                user: None,
456                reasoning_effort: None,
457                extra: Default::default(),
458            };
459
460            match model.send_ct(request).await {
461                Ok(response) => {
462                    if let Some(title) = response.content() {
463                        let title = title.trim().trim_matches('"').to_string();
464                        if !title.is_empty() {
465                            let mut conversation = conversation_mutex.lock().await;
466                            if conversation.title.is_empty() {
467                                conversation.title = title;
468                                if let Some(ref handle) = conversation.handle {
469                                    let _ = storage.update_session_meta(
470                                        handle,
471                                        &conversation.meta(&agent_name, &created_by),
472                                    );
473                                }
474                            }
475                        }
476                    }
477                }
478                Err(e) => {
479                    tracing::debug!("title generation failed: {e}");
480                }
481            }
482        });
483    }
484}