Skip to main content

crabtalk_runtime/engine/
topic.rs

1//! Topics — per-(agent, sender) conversation partitioning. See
2//! [RFC 0171](https://github.com/crabtalk/crabtalk/issues/171). One
3//! `(agent, sender)` pair maps to N conversations keyed by topic
4//! title, plus an active-topic pointer. Untopicked chats are tmp and
5//! live only in [`TopicRouter::tmp`]; they never reach storage.
6
7use super::Runtime;
8use crate::{Config, ConversationHandle};
9use anyhow::{Result, bail};
10use memory::{EntryKind, Op};
11use std::{collections::HashMap, sync::atomic::Ordering};
12use wcore::storage::Storage;
13
14/// Per-(agent, sender) topic routing. `active = None` means the caller
15/// is on a tmp chat (no topic). Tmp chats have their own `ConvSlot` but
16/// their id is tracked here so `get_or_create_conversation` can find
17/// them without scanning.
18#[derive(Default)]
19pub struct TopicRouter {
20    pub(super) by_title: HashMap<String, u64>,
21    pub(super) active: Option<String>,
22    pub(super) tmp: Option<u64>,
23}
24
25impl TopicRouter {
26    /// Resolve the conversation this router currently routes to:
27    /// the active topic's conversation if one is set, otherwise the
28    /// tmp conversation if one exists.
29    pub(super) fn active_conversation(&self) -> Option<u64> {
30        self.active
31            .as_ref()
32            .and_then(|t| self.by_title.get(t).copied())
33            .or(self.tmp)
34    }
35}
36
37/// Outcome of `switch_topic`. `resumed = true` means the topic
38/// already existed (in the router or on disk); `false` means it was
39/// freshly created.
40#[derive(Debug, Clone, Copy)]
41pub struct SwitchOutcome {
42    pub conversation_id: u64,
43    pub resumed: bool,
44}
45
46impl<C: Config> Runtime<C> {
47    /// Switch the active topic for `(agent, sender)`. Creates a new
48    /// topic conversation if the title doesn't exist yet; resumes the
49    /// existing one otherwise. When creating, writes an
50    /// `EntryKind::Topic` memory entry (unless one already exists for
51    /// the title) so the topic is searchable via `search_topics`.
52    ///
53    /// Returns the target `conversation_id` in the outcome. The caller
54    /// is responsible for telling the user which conversation to route
55    /// the next message to — this call only updates runtime state.
56    pub async fn switch_topic(
57        &self,
58        agent: &str,
59        sender: &str,
60        title: &str,
61        description: Option<&str>,
62    ) -> Result<SwitchOutcome> {
63        if !self.has_agent(agent).await {
64            bail!("agent '{agent}' not registered");
65        }
66        if title.is_empty() {
67            bail!("topic title cannot be empty");
68        }
69
70        let key = (agent.to_owned(), sender.to_owned());
71
72        // Reserve the slot under the router lock — any concurrent
73        // caller that races us observes the reservation on the next
74        // lookup and resumes to our conversation instead of creating a
75        // duplicate session.
76        let id = {
77            let mut topics = self.topics.write().await;
78            let router = topics.entry(key.clone()).or_default();
79            if let Some(id) = router.by_title.get(title).copied() {
80                router.active = Some(title.to_owned());
81                return Ok(SwitchOutcome {
82                    conversation_id: id,
83                    resumed: true,
84                });
85            }
86            let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
87            router.by_title.insert(title.to_owned(), id);
88            router.active = Some(title.to_owned());
89            id
90        };
91
92        match self
93            .finalize_switch(agent, sender, title, description, id)
94            .await
95        {
96            Ok(outcome) => Ok(outcome),
97            Err(e) => {
98                self.rollback_reservation(&key, title).await;
99                Err(e)
100            }
101        }
102    }
103
104    /// Cold-path body of `switch_topic`. Called after the slot
105    /// has been reserved; any error here triggers a router rollback.
106    async fn finalize_switch(
107        &self,
108        agent: &str,
109        sender: &str,
110        title: &str,
111        description: Option<&str>,
112        id: u64,
113    ) -> Result<SwitchOutcome> {
114        let existing = self.find_session(agent, sender, title);
115        let resumed = existing.is_some();
116
117        if !resumed {
118            let desc = description.ok_or_else(|| {
119                anyhow::anyhow!("description required when creating a new topic '{title}'")
120            })?;
121            self.ensure_entry(title, desc);
122        }
123
124        let slot = Self::new_slot(id, agent, sender);
125        {
126            let mut conversation = slot.inner.lock().await;
127            conversation.topic = Some(title.to_owned());
128            match existing {
129                Some((handle, snapshot)) => {
130                    conversation.history =
131                        self.resumed_history(snapshot.archive.as_deref(), snapshot.history);
132                    conversation.title = snapshot.meta.title;
133                    conversation.uptime_secs = snapshot.meta.uptime_secs;
134                    conversation.handle = Some(handle);
135                }
136                None => {
137                    let storage = self.storage();
138                    let handle = storage.create_session(agent, sender)?;
139                    // Stamp meta so the new session carries its topic
140                    // from the first write — a missing `topic` here
141                    // would make the session invisible to future
142                    // `find_session` scans. `conversation.meta`
143                    // already reflects the topic we set above.
144                    storage.update_session_meta(&handle, &conversation.meta(agent, sender))?;
145                    conversation.handle = Some(handle);
146                }
147            }
148        }
149
150        self.conversations.write().await.insert(id, slot);
151        Ok(SwitchOutcome {
152            conversation_id: id,
153            resumed,
154        })
155    }
156
157    async fn rollback_reservation(&self, key: &(String, String), title: &str) {
158        let mut topics = self.topics.write().await;
159        let Some(router) = topics.get_mut(key) else {
160            return;
161        };
162        router.by_title.remove(title);
163        if router.active.as_deref() == Some(title) {
164            router.active = None;
165        }
166        if router.by_title.is_empty() && router.active.is_none() && router.tmp.is_none() {
167            topics.remove(key);
168        }
169    }
170
171    /// Scan storage for a session matching `(agent, sender, topic)`.
172    /// Blocking I/O; call from outside the runtime locks.
173    fn find_session(
174        &self,
175        agent: &str,
176        sender: &str,
177        title: &str,
178    ) -> Option<(ConversationHandle, wcore::storage::SessionSnapshot)> {
179        let storage = self.storage();
180        let summaries = storage.list_sessions().ok()?;
181        let mut best: Option<(ConversationHandle, wcore::storage::ConversationMeta)> = None;
182        for summary in summaries {
183            if summary.meta.agent != agent || summary.meta.created_by != sender {
184                continue;
185            }
186            if summary.meta.topic.as_deref() != Some(title) {
187                continue;
188            }
189            // Later sessions win — `created_at` is an RFC3339 string so
190            // lexicographic comparison is chronological.
191            if best
192                .as_ref()
193                .is_none_or(|(_, meta)| summary.meta.created_at > meta.created_at)
194            {
195                best = Some((summary.handle, summary.meta));
196            }
197        }
198        let (handle, _) = best?;
199        let snapshot = storage.load_session(&handle).ok().flatten()?;
200        Some((handle, snapshot))
201    }
202
203    /// Write the Topic memory entry if it doesn't already exist.
204    /// Duplicate-name errors are ignored — a prior process may have
205    /// created the same topic.
206    fn ensure_entry(&self, title: &str, description: &str) {
207        let mut mem = self.memory.write();
208        if mem.get(title).is_some() {
209            return;
210        }
211        if let Err(e) = mem.apply(Op::Add {
212            name: title.to_owned(),
213            content: description.to_owned(),
214            aliases: vec![],
215            kind: EntryKind::Topic,
216        }) {
217            tracing::warn!("topic entry write failed: {e}");
218        }
219    }
220}