crabtalk_runtime/engine/
topic.rs1use super::Runtime;
8use crate::{Config, ConversationHandle};
9use anyhow::{Result, bail};
10use memory::{EntryKind, Op};
11use std::{collections::HashMap, sync::atomic::Ordering};
12use wcore::storage::Storage;
13
14#[derive(Default)]
19pub struct TopicRouter {
20 pub(super) by_title: HashMap<String, u64>,
21 pub(super) active: Option<String>,
22 pub(super) tmp: Option<u64>,
23}
24
25impl TopicRouter {
26 pub(super) fn active_conversation(&self) -> Option<u64> {
30 self.active
31 .as_ref()
32 .and_then(|t| self.by_title.get(t).copied())
33 .or(self.tmp)
34 }
35}
36
37#[derive(Debug, Clone, Copy)]
41pub struct SwitchOutcome {
42 pub conversation_id: u64,
43 pub resumed: bool,
44}
45
46impl<C: Config> Runtime<C> {
47 pub async fn switch_topic(
57 &self,
58 agent: &str,
59 sender: &str,
60 title: &str,
61 description: Option<&str>,
62 ) -> Result<SwitchOutcome> {
63 if !self.has_agent(agent).await {
64 bail!("agent '{agent}' not registered");
65 }
66 if title.is_empty() {
67 bail!("topic title cannot be empty");
68 }
69
70 let key = (agent.to_owned(), sender.to_owned());
71
72 let id = {
77 let mut topics = self.topics.write().await;
78 let router = topics.entry(key.clone()).or_default();
79 if let Some(id) = router.by_title.get(title).copied() {
80 router.active = Some(title.to_owned());
81 return Ok(SwitchOutcome {
82 conversation_id: id,
83 resumed: true,
84 });
85 }
86 let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
87 router.by_title.insert(title.to_owned(), id);
88 router.active = Some(title.to_owned());
89 id
90 };
91
92 match self
93 .finalize_switch(agent, sender, title, description, id)
94 .await
95 {
96 Ok(outcome) => Ok(outcome),
97 Err(e) => {
98 self.rollback_reservation(&key, title).await;
99 Err(e)
100 }
101 }
102 }
103
104 async fn finalize_switch(
107 &self,
108 agent: &str,
109 sender: &str,
110 title: &str,
111 description: Option<&str>,
112 id: u64,
113 ) -> Result<SwitchOutcome> {
114 let existing = self.find_session(agent, sender, title);
115 let resumed = existing.is_some();
116
117 if !resumed {
118 let desc = description.ok_or_else(|| {
119 anyhow::anyhow!("description required when creating a new topic '{title}'")
120 })?;
121 self.ensure_entry(title, desc);
122 }
123
124 let slot = Self::new_slot(id, agent, sender);
125 {
126 let mut conversation = slot.inner.lock().await;
127 conversation.topic = Some(title.to_owned());
128 match existing {
129 Some((handle, snapshot)) => {
130 conversation.history =
131 self.resumed_history(snapshot.archive.as_deref(), snapshot.history);
132 conversation.title = snapshot.meta.title;
133 conversation.uptime_secs = snapshot.meta.uptime_secs;
134 conversation.handle = Some(handle);
135 }
136 None => {
137 let storage = self.storage();
138 let handle = storage.create_session(agent, sender)?;
139 storage.update_session_meta(&handle, &conversation.meta(agent, sender))?;
145 conversation.handle = Some(handle);
146 }
147 }
148 }
149
150 self.conversations.write().await.insert(id, slot);
151 Ok(SwitchOutcome {
152 conversation_id: id,
153 resumed,
154 })
155 }
156
157 async fn rollback_reservation(&self, key: &(String, String), title: &str) {
158 let mut topics = self.topics.write().await;
159 let Some(router) = topics.get_mut(key) else {
160 return;
161 };
162 router.by_title.remove(title);
163 if router.active.as_deref() == Some(title) {
164 router.active = None;
165 }
166 if router.by_title.is_empty() && router.active.is_none() && router.tmp.is_none() {
167 topics.remove(key);
168 }
169 }
170
171 fn find_session(
174 &self,
175 agent: &str,
176 sender: &str,
177 title: &str,
178 ) -> Option<(ConversationHandle, wcore::storage::SessionSnapshot)> {
179 let storage = self.storage();
180 let summaries = storage.list_sessions().ok()?;
181 let mut best: Option<(ConversationHandle, wcore::storage::ConversationMeta)> = None;
182 for summary in summaries {
183 if summary.meta.agent != agent || summary.meta.created_by != sender {
184 continue;
185 }
186 if summary.meta.topic.as_deref() != Some(title) {
187 continue;
188 }
189 if best
192 .as_ref()
193 .is_none_or(|(_, meta)| summary.meta.created_at > meta.created_at)
194 {
195 best = Some((summary.handle, summary.meta));
196 }
197 }
198 let (handle, _) = best?;
199 let snapshot = storage.load_session(&handle).ok().flatten()?;
200 Some((handle, snapshot))
201 }
202
203 fn ensure_entry(&self, title: &str, description: &str) {
207 let mut mem = self.memory.write();
208 if mem.get(title).is_some() {
209 return;
210 }
211 if let Err(e) = mem.apply(Op::Add {
212 name: title.to_owned(),
213 content: description.to_owned(),
214 aliases: vec![],
215 kind: EntryKind::Topic,
216 }) {
217 tracing::warn!("topic entry write failed: {e}");
218 }
219 }
220}