1use super::{ConvSlot, Runtime, TopicRouter};
4use crate::{Config, Conversation, ConversationHandle};
5use anyhow::{Result, bail};
6use crabllm_core::{ChatCompletionRequest, Message, Role};
7use memory::{EntryKind, Op};
8use std::sync::{Arc, atomic::Ordering};
9use tokio::sync::Mutex;
10use wcore::{model::HistoryEntry, storage::Storage};
11
12impl<C: Config> Runtime<C> {
13 pub(super) fn new_slot(id: u64, agent: &str, created_by: &str) -> ConvSlot {
14 ConvSlot {
15 agent: agent.to_owned(),
16 created_by: created_by.to_owned(),
17 inner: Arc::new(Mutex::new(Conversation::new(id))),
18 }
19 }
20
21 pub async fn get_or_create_conversation(&self, agent: &str, created_by: &str) -> Result<u64> {
30 if !self.has_agent(agent).await {
31 bail!("agent '{agent}' not registered");
32 }
33
34 let key = (agent.to_owned(), created_by.to_owned());
35
36 if let Some(id) = self
40 .topics
41 .read()
42 .await
43 .get(&key)
44 .and_then(TopicRouter::active_conversation)
45 {
46 return Ok(id);
47 }
48
49 let id = {
52 let mut topics = self.topics.write().await;
53 let router = topics.entry(key).or_default();
54 if let Some(existing) = router.active_conversation() {
55 return Ok(existing);
56 }
57 let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
58 router.tmp = Some(id);
59 id
60 };
61 let slot = Self::new_slot(id, agent, created_by);
62 self.conversations.write().await.insert(id, slot);
63 Ok(id)
64 }
65
66 pub async fn load(&self, handle: ConversationHandle) -> Result<u64> {
68 let storage = self.storage();
69 let snapshot = storage
70 .load_session(&handle)?
71 .ok_or_else(|| anyhow::anyhow!("conversation '{}' not found", handle.as_str()))?;
72 if !self.has_agent(&snapshot.meta.agent).await {
73 bail!("agent '{}' not registered", snapshot.meta.agent);
74 }
75 let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
76 let slot = Self::new_slot(id, &snapshot.meta.agent, &snapshot.meta.created_by);
77 {
78 let mut conversation = slot.inner.lock().await;
79 conversation.history =
80 self.resumed_history(snapshot.archive.as_deref(), snapshot.history);
81 conversation.title = snapshot.meta.title;
82 conversation.uptime_secs = snapshot.meta.uptime_secs;
83 conversation.handle = Some(handle);
84 }
85 self.conversations.write().await.insert(id, slot);
86 Ok(id)
87 }
88
89 pub async fn list_active(&self) -> Vec<wcore::protocol::message::ActiveConversationInfo> {
90 let slots: Vec<_> = {
95 let conversations = self.conversations.read().await;
96 conversations
97 .values()
98 .map(|s| (s.agent.clone(), s.created_by.clone(), s.inner.clone()))
99 .collect()
100 };
101 let mut infos = Vec::with_capacity(slots.len());
102 for (agent, sender, mutex) in slots {
103 let c = mutex.lock().await;
104 infos.push(wcore::protocol::message::ActiveConversationInfo {
105 agent,
106 sender,
107 message_count: c.history.len() as u64,
108 alive_secs: c.uptime_secs,
109 title: c.title.clone(),
110 });
111 }
112 infos
113 }
114
115 pub async fn close(&self, id: u64) -> bool {
116 self.steering.write().await.remove(&id);
117 let removed = self.conversations.write().await.remove(&id);
118 if let Some(slot) = &removed {
119 let key = (slot.agent.clone(), slot.created_by.clone());
120 let mut topics = self.topics.write().await;
121 if let Some(router) = topics.get_mut(&key) {
122 if router.tmp == Some(id) {
123 router.tmp = None;
124 }
125 router.by_title.retain(|_, cid| *cid != id);
126 if router
127 .active
128 .as_ref()
129 .is_some_and(|t| !router.by_title.contains_key(t))
130 {
131 router.active = None;
132 }
133 if router.tmp.is_none() && router.by_title.is_empty() {
134 topics.remove(&key);
135 }
136 }
137 }
138 removed.is_some()
139 }
140
141 pub async fn steer(&self, conversation_id: u64, content: String) -> Result<()> {
142 let senders = self.steering.read().await;
143 let tx = senders.get(&conversation_id).ok_or_else(|| {
144 anyhow::anyhow!("no active stream for conversation {conversation_id}")
145 })?;
146 tx.send(Some(content))
147 .map_err(|_| anyhow::anyhow!("steering channel closed"))?;
148 Ok(())
149 }
150
151 pub async fn conversation(&self, id: u64) -> Option<Arc<Mutex<Conversation>>> {
152 self.conversations
153 .read()
154 .await
155 .get(&id)
156 .map(|slot| slot.inner.clone())
157 }
158
159 pub(crate) async fn acquire_slot(
164 &self,
165 id: u64,
166 ) -> Option<(String, String, Arc<Mutex<Conversation>>)> {
167 self.conversations
168 .read()
169 .await
170 .get(&id)
171 .map(ConvSlot::parts)
172 }
173
174 pub async fn conversations(&self) -> Vec<Arc<Mutex<Conversation>>> {
175 self.conversations
176 .read()
177 .await
178 .values()
179 .map(|slot| slot.inner.clone())
180 .collect()
181 }
182
183 pub async fn conversation_count(&self) -> usize {
184 self.conversations.read().await.len()
185 }
186
187 pub async fn conversation_id(&self, agent: &str, sender: &str) -> Option<u64> {
188 let conversations = self.conversations.read().await;
189 for (id, slot) in conversations.iter() {
190 if slot.agent == agent && slot.created_by == sender {
191 return Some(*id);
192 }
193 }
194 None
195 }
196
197 pub async fn compact(&self, conversation_id: u64) -> Option<String> {
198 let (agent_name, conversation_mutex) = {
202 let conversations = self.conversations.read().await;
203 let slot = conversations.get(&conversation_id)?;
204 (slot.agent.clone(), slot.inner.clone())
205 };
206 let history = {
207 let conversation = conversation_mutex.lock().await;
208 if conversation.history.is_empty() {
209 return None;
210 }
211 conversation.history.clone()
212 };
213 self.resolve_agent(&agent_name)
214 .await?
215 .compact(&history)
216 .await
217 }
218
219 pub async fn transfer_to<C2: Config>(&self, dest: &mut Runtime<C2>) {
220 let conversations = self.conversations.read().await;
221 let dest_conversations = dest.conversations.get_mut();
222 for (id, slot) in conversations.iter() {
223 dest_conversations.insert(*id, slot.clone());
224 }
225 let next = self.next_conversation_id.load(Ordering::Relaxed);
226 dest.next_conversation_id.store(next, Ordering::Relaxed);
227 }
228
229 pub(super) fn resumed_history(
235 &self,
236 archive: Option<&str>,
237 mut history: Vec<HistoryEntry>,
238 ) -> Vec<HistoryEntry> {
239 let Some(name) = archive else { return history };
240 let content = {
241 let mem = self.memory.read();
242 mem.get(name).map(|e| e.content.clone())
243 };
244 let prefix = content.unwrap_or_else(|| {
245 tracing::warn!("resume: archive '{name}' missing from memory");
246 format!("[archived context unavailable: {name}]")
247 });
248 let mut out = Vec::with_capacity(history.len() + 1);
249 out.push(HistoryEntry::user(prefix));
250 out.append(&mut history);
251 out
252 }
253
254 fn write_archive(&self, topic: &str, summary: String) -> Option<String> {
262 let slug = wcore::sender_slug(topic);
263 let prefix = format!("{slug}-");
264 let mut mem = self.memory.write();
265 let next_seq = mem
268 .list()
269 .filter(|e| e.kind == EntryKind::Archive && e.name.starts_with(&prefix))
270 .filter_map(|e| {
271 let suffix = &e.name[prefix.len()..];
272 let n: u32 = suffix.parse().ok()?;
273 (n.to_string() == suffix).then_some(n)
277 })
278 .max()
279 .unwrap_or(0)
280 + 1;
281 let name = format!("{slug}-{next_seq}");
282 match mem.apply(Op::Add {
283 name: name.clone(),
284 content: summary,
285 aliases: vec![],
286 kind: EntryKind::Archive,
287 }) {
288 Ok(()) => Some(name),
289 Err(e) => {
290 tracing::error!("archive write failed: {e}");
291 None
292 }
293 }
294 }
295
296 fn ensure_handle(&self, conversation: &mut Conversation, agent: &str, created_by: &str) {
300 if conversation.handle.is_some() || conversation.topic.is_none() {
301 return;
302 }
303 let storage = self.storage();
304 match storage.create_session(agent, created_by) {
305 Ok(handle) => conversation.handle = Some(handle),
306 Err(e) => tracing::warn!("failed to create session: {e}"),
307 }
308 }
309
310 #[allow(clippy::too_many_arguments)]
315 pub(crate) fn finalize_run(
316 &self,
317 conversation_id: u64,
318 conversation: &mut Conversation,
319 conversation_mutex: Arc<Mutex<Conversation>>,
320 agent: &str,
321 created_by: &str,
322 run_start: std::time::Instant,
323 pre_run_len: usize,
324 compact_summary: Option<String>,
325 event_trace: &[wcore::EventLine],
326 ) {
327 conversation.uptime_secs += run_start.elapsed().as_secs();
328 self.persist_messages(
329 conversation,
330 agent,
331 created_by,
332 pre_run_len,
333 compact_summary,
334 event_trace,
335 );
336 if conversation.title.is_empty() && conversation.history.len() >= 2 {
337 self.spawn_title_generation(conversation_id, agent, created_by, conversation_mutex);
338 }
339 }
340
341 pub(crate) fn persist_messages(
344 &self,
345 conversation: &mut Conversation,
346 agent: &str,
347 created_by: &str,
348 pre_run_len: usize,
349 compact_summary: Option<String>,
350 event_trace: &[wcore::EventLine],
351 ) {
352 self.ensure_handle(conversation, agent, created_by);
353 let Some(ref handle) = conversation.handle else {
354 return;
355 };
356 let storage = self.storage();
357
358 if let Some(summary) = compact_summary {
359 let topic = conversation
362 .topic
363 .clone()
364 .expect("persisted conversation without a topic");
365 if let Some(archive_name) = self.write_archive(&topic, summary) {
368 let _ = storage.append_session_compact(handle, &archive_name);
369 if conversation.history.len() > 1 {
370 let tail: Vec<_> = conversation.history[1..]
371 .iter()
372 .filter(|e| !e.auto_injected)
373 .cloned()
374 .collect();
375 let _ = storage.append_session_messages(handle, &tail);
376 }
377 }
378 } else {
379 let new_entries: Vec<_> = conversation.history[pre_run_len..]
380 .iter()
381 .filter(|e| !e.auto_injected)
382 .cloned()
383 .collect();
384 let _ = storage.append_session_messages(handle, &new_entries);
385 }
386 if !event_trace.is_empty() {
387 let _ = storage.append_session_events(handle, event_trace);
388 }
389 let _ = storage.update_session_meta(handle, &conversation.meta(agent, created_by));
390 }
391
392 pub(crate) fn spawn_title_generation(
393 &self,
394 _conversation_id: u64,
395 agent_name: &str,
396 created_by: &str,
397 conversation_mutex: Arc<Mutex<Conversation>>,
398 ) {
399 let model = self.model.clone();
400 let storage = self.storage().clone();
401 let agent_name = agent_name.to_owned();
402 let created_by = created_by.to_owned();
403 let model_name = self
404 .agents
405 .read()
406 .get(agent_name.as_str())
407 .map(|a| a.config.model.clone())
408 .unwrap_or_default();
409 if model_name.is_empty() {
410 return;
411 }
412 tokio::spawn(async move {
413 let (user_msg, assistant_msg) = {
414 let conversation = conversation_mutex.lock().await;
415 let user = conversation
416 .history
417 .iter()
418 .find(|e| *e.role() == Role::User && !e.auto_injected)
419 .map(|e| e.text().to_owned());
420 let assistant = conversation
421 .history
422 .iter()
423 .find(|e| *e.role() == Role::Assistant)
424 .map(|e| e.text().to_owned());
425 (user, assistant)
426 };
427
428 let Some(user) = user_msg else { return };
429 let Some(assistant) = assistant_msg else {
430 return;
431 };
432
433 let user_snippet: String = user.chars().take(200).collect();
434 let assistant_snippet: String = assistant.chars().take(200).collect();
435
436 let prompt = format!(
437 "Summarize this conversation in 3-6 words as a short title. \
438 Return ONLY the title, nothing else.\n\n\
439 User: {user_snippet}\nAssistant: {assistant_snippet}"
440 );
441
442 let request = ChatCompletionRequest {
443 model: model_name,
444 messages: vec![Message::user(&prompt)],
445 temperature: None,
446 top_p: None,
447 max_tokens: None,
448 stream: None,
449 stop: None,
450 tools: None,
451 tool_choice: None,
452 frequency_penalty: None,
453 presence_penalty: None,
454 seed: None,
455 user: None,
456 reasoning_effort: None,
457 extra: Default::default(),
458 };
459
460 match model.send_ct(request).await {
461 Ok(response) => {
462 if let Some(title) = response.content() {
463 let title = title.trim().trim_matches('"').to_string();
464 if !title.is_empty() {
465 let mut conversation = conversation_mutex.lock().await;
466 if conversation.title.is_empty() {
467 conversation.title = title;
468 if let Some(ref handle) = conversation.handle {
469 let _ = storage.update_session_meta(
470 handle,
471 &conversation.meta(&agent_name, &created_by),
472 );
473 }
474 }
475 }
476 }
477 }
478 Err(e) => {
479 tracing::debug!("title generation failed: {e}");
480 }
481 }
482 });
483 }
484}