use super::Runtime;
use crate::{Config, ConversationHandle};
use anyhow::{Result, bail};
use memory::{EntryKind, Op};
use std::{collections::HashMap, sync::atomic::Ordering};
use wcore::storage::Storage;
#[derive(Default)]
pub struct TopicRouter {
pub(super) by_title: HashMap<String, u64>,
pub(super) active: Option<String>,
pub(super) tmp: Option<u64>,
}
impl TopicRouter {
pub(super) fn active_conversation(&self) -> Option<u64> {
self.active
.as_ref()
.and_then(|t| self.by_title.get(t).copied())
.or(self.tmp)
}
}
#[derive(Debug, Clone, Copy)]
pub struct SwitchOutcome {
pub conversation_id: u64,
pub resumed: bool,
}
impl<C: Config> Runtime<C> {
pub async fn switch_topic(
&self,
agent: &str,
sender: &str,
title: &str,
description: Option<&str>,
) -> Result<SwitchOutcome> {
if !self.has_agent(agent).await {
bail!("agent '{agent}' not registered");
}
if title.is_empty() {
bail!("topic title cannot be empty");
}
let key = (agent.to_owned(), sender.to_owned());
let id = {
let mut topics = self.topics.write().await;
let router = topics.entry(key.clone()).or_default();
if let Some(id) = router.by_title.get(title).copied() {
router.active = Some(title.to_owned());
return Ok(SwitchOutcome {
conversation_id: id,
resumed: true,
});
}
let id = self.next_conversation_id.fetch_add(1, Ordering::Relaxed);
router.by_title.insert(title.to_owned(), id);
router.active = Some(title.to_owned());
id
};
match self
.finalize_switch(agent, sender, title, description, id)
.await
{
Ok(outcome) => Ok(outcome),
Err(e) => {
self.rollback_reservation(&key, title).await;
Err(e)
}
}
}
async fn finalize_switch(
&self,
agent: &str,
sender: &str,
title: &str,
description: Option<&str>,
id: u64,
) -> Result<SwitchOutcome> {
let existing = self.find_session(agent, sender, title);
let resumed = existing.is_some();
if !resumed {
let desc = description.ok_or_else(|| {
anyhow::anyhow!("description required when creating a new topic '{title}'")
})?;
self.ensure_entry(title, desc);
}
let slot = Self::new_slot(id, agent, sender);
{
let mut conversation = slot.inner.lock().await;
conversation.topic = Some(title.to_owned());
match existing {
Some((handle, snapshot)) => {
conversation.history =
self.resumed_history(snapshot.archive.as_deref(), snapshot.history);
conversation.title = snapshot.meta.title;
conversation.uptime_secs = snapshot.meta.uptime_secs;
conversation.handle = Some(handle);
}
None => {
let storage = self.storage();
let handle = storage.create_session(agent, sender)?;
storage.update_session_meta(&handle, &conversation.meta(agent, sender))?;
conversation.handle = Some(handle);
}
}
}
self.conversations.write().await.insert(id, slot);
Ok(SwitchOutcome {
conversation_id: id,
resumed,
})
}
async fn rollback_reservation(&self, key: &(String, String), title: &str) {
let mut topics = self.topics.write().await;
let Some(router) = topics.get_mut(key) else {
return;
};
router.by_title.remove(title);
if router.active.as_deref() == Some(title) {
router.active = None;
}
if router.by_title.is_empty() && router.active.is_none() && router.tmp.is_none() {
topics.remove(key);
}
}
fn find_session(
&self,
agent: &str,
sender: &str,
title: &str,
) -> Option<(ConversationHandle, wcore::storage::SessionSnapshot)> {
let storage = self.storage();
let summaries = storage.list_sessions().ok()?;
let mut best: Option<(ConversationHandle, wcore::storage::ConversationMeta)> = None;
for summary in summaries {
if summary.meta.agent != agent || summary.meta.created_by != sender {
continue;
}
if summary.meta.topic.as_deref() != Some(title) {
continue;
}
if best
.as_ref()
.is_none_or(|(_, meta)| summary.meta.created_at > meta.created_at)
{
best = Some((summary.handle, summary.meta));
}
}
let (handle, _) = best?;
let snapshot = storage.load_session(&handle).ok().flatten()?;
Some((handle, snapshot))
}
fn ensure_entry(&self, title: &str, description: &str) {
let mut mem = self.memory.write();
if mem.get(title).is_some() {
return;
}
if let Err(e) = mem.apply(Op::Add {
name: title.to_owned(),
content: description.to_owned(),
aliases: vec![],
kind: EntryKind::Topic,
}) {
tracing::warn!("topic entry write failed: {e}");
}
}
}