//! Persistent memory tools and conversation storage.
//!
//! This module stores conversations, resources, artifacts, and memory-management
//! commands for agents. Conversation metadata is stored in AndaDB collections
//! with BTree and BM25 indexes, while higher-level memory operations are exposed
//! through KIP (Knowledge Interaction Protocol) tools backed by the Cognitive
//! Nexus.
//!
//! The KIP, resource, and conversation tools report the same capability group
//! via [`memory_tool_group_info`], so the discovery layer presents them to the
//! model as one persistent-memory bundle.
use anda_cognitive_nexus::{CognitiveNexus, ConceptPK};
use anda_core::{
BoxError, ContentPart, Document, Documents, FunctionDefinition, Message, Resource, ResourceRef,
StateFeatures, Tool, ToolGroupInfo, ToolOutput, Usage, Xid, gen_schema_for,
};
use anda_db::{
collection::{Collection, CollectionConfig},
database::AndaDB,
error::DBError,
index::BTree,
query::{Filter, Query, RangeQuery, Search},
};
use anda_db_schema::{
AndaDBSchema, FieldEntry, FieldKey, FieldType, Ft, Fv, Json, Schema, SchemaError,
};
use anda_db_tfs::jieba_tokenizer;
use anda_kip::{
DescribeTarget, KipError, META_SYSTEM_NAME, MetaCommand, PERSON_TYPE, Request, Response,
};
use candid::Principal;
use cbor2::cbor;
use ic_auth_types::ByteBufB64;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Map, json};
use std::{
collections::{BTreeMap, HashMap},
fmt,
sync::{Arc, LazyLock},
};
use crate::{context::BaseCtx, extension::fetch::FetchWebResourcesTool, rfc3339_datetime, unix_ms};
/// Stable id of the persistent memory capability group.
pub const MEMORY_TOOL_GROUP_ID: &str = "memory";
/// Returns the shared [`ToolGroupInfo`] for the persistent memory tools.
///
/// The KIP execution, resource, and conversation tools all report this so the
/// registry presents them as one bundle. The registry fills in the member list
/// from the tools actually registered.
pub fn memory_tool_group_info() -> ToolGroupInfo {
ToolGroupInfo {
id: MEMORY_TOOL_GROUP_ID.to_string(),
title: "Persistent memory".to_string(),
description: "Store and recall long-term knowledge and past conversations from the agent's persistent memory (Cognitive Nexus + conversation store).".to_string(),
instructions: Some(
"These tools share one persistent memory backend. Use the KIP execution tool (`execute_kip` / `execute_kip_readonly`) to query or update the Cognitive Nexus knowledge graph; prefer the read-only variant for retrieval. Use `list_previous_conversations` and `search_conversations` to recall past dialogue, `get_resource_content` to fetch a stored resource by id, and `memory_api` for unified conversation management (stop/steer/follow-up/delete, logs). Retrieve context before acting, and write durable facts back so they survive across sessions.".to_string(),
),
}
}
/// Default KIP tool function definition used by [`MemoryManagement`].
pub static FUNCTION_DEFINITION: LazyLock<FunctionDefinition> = LazyLock::new(|| {
serde_json::from_value(json!({
"name": "execute_kip",
"description": "Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to interact with your persistent memory.",
"parameters": {
"type": "object",
"properties": {
"commands": {
"type": "array",
"description": "An array of KIP commands for batch execution (reduces round-trips). Commands are executed sequentially; execution stops on first KML error.",
"items": {
"type": "string"
}
},
"parameters": {
"type": "object",
"description": "An optional JSON object of key-value pairs used for safe substitution of placeholders in the command string(s). Placeholders should start with ':' (e.g., :name, :limit). IMPORTANT: A placeholder must represent a complete JSON value token (e.g., name: :name). Do not embed placeholders inside quoted strings (e.g., \"Hello :name\"), because substitution uses JSON serialization."
},
},
"required": ["commands", "parameters"],
"additionalProperties": false
}
})).unwrap()
});
/// Conversation record stored in the memory database.
///
/// Schema version: 4
#[derive(Debug, Clone, Deserialize, Serialize, AndaDBSchema)]
pub struct Conversation {
/// Unique collection identifier assigned by AndaDB.
pub _id: u64,
/// Principal that owns the conversation.
#[field_type = "Bytes"]
pub user: Principal,
/// Optional thread identifier used to group related conversation turns.
#[field_type = "Option<Bytes>"]
#[serde(skip_serializing_if = "Option::is_none")]
pub thread: Option<Xid>,
/// Serialized chat messages accumulated for the conversation.
pub messages: Vec<Json>,
/// The request resources used by the agent to process the conversation.
pub resources: Vec<Resource>,
/// A collection of artifacts generated by the agent during the execution of the task.
pub artifacts: Vec<Resource>,
/// Current lifecycle state of the conversation.
#[field_type = "Text"]
pub status: ConversationStatus,
/// Failure reason recorded when the conversation did not complete.
#[serde(skip_serializing_if = "Option::is_none")]
pub failed_reason: Option<String>,
/// The LLM usage statistics for the conversation.
#[field_type = "Map<String, U64>"]
pub usage: Usage,
/// Messages queued to interrupt the agent mid-run.
///
/// They are delivered after the current tool execution and skip remaining
/// pending tools.
#[serde(skip_serializing_if = "Option::is_none")]
pub steering_messages: Option<Vec<String>>,
/// Follow-up messages queued for the agent's next safe user turn.
///
/// They are delivered with the current pending tool-call results when they
/// finish, or at the next idle boundary when no tools are pending. Steering
/// still takes priority.
#[serde(skip_serializing_if = "Option::is_none")]
pub follow_up_messages: Option<Vec<String>>,
/// The child conversation ID, if this conversation has been continued. Should not be updated after set.
#[serde(skip_serializing_if = "Option::is_none")]
pub child: Option<u64>,
/// The ancestor conversation IDs, ordered from root to parent.
/// Should not be updated after creation.
#[serde(skip_serializing_if = "Option::is_none")]
pub ancestors: Option<Vec<u64>>,
/// An optional label for the conversation, which can be used for categorization or retrieval.
#[serde(skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
/// Extra information for future extensions.
///
/// This field is not indexed and should not be used for filtering or
/// searching.
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
/// The period when the conversation was created, in hours (timestamp / 3600 / 1000).
/// It is used to index the conversation for faster retrieval by time.
pub period: u64,
/// The timestamp when the conversation was created, in milliseconds.
pub created_at: u64,
/// The timestamp when the conversation was updated, in milliseconds.
pub updated_at: u64,
}
impl Default for Conversation {
fn default() -> Self {
Self {
_id: 0,
user: Principal::anonymous(),
thread: None,
messages: Vec::new(),
resources: Vec::new(),
artifacts: Vec::new(),
status: ConversationStatus::default(),
failed_reason: None,
usage: Usage::default(),
steering_messages: None,
follow_up_messages: None,
child: None,
ancestors: None,
label: None,
extra: None,
period: 0,
created_at: 0,
updated_at: 0,
}
}
}
impl Conversation {
/// Appends messages to the serialized chat history.
pub fn append_messages(&mut self, message: Vec<Message>) {
self.messages.extend(message.into_iter().map(|v| json!(v)));
}
/// Converts mutable conversation fields into AndaDB update values.
pub fn to_changes(&self) -> Result<BTreeMap<String, Fv>, BoxError> {
let messages = cbor!(self.messages).map_err(|err| format!("encode messages: {err}"))?;
let resources = cbor!(self.resources).map_err(|err| format!("encode resources: {err}"))?;
let artifacts = cbor!(self.artifacts).map_err(|err| format!("encode artifacts: {err}"))?;
let usage = cbor!(self.usage).map_err(|err| format!("encode usage: {err}"))?;
let mut changes = BTreeMap::from([
(
"messages".to_string(),
Fv::array_from(messages, &[Ft::Json])?,
),
(
"resources".to_string(),
Fv::array_from(resources, &[Resource::field_type()])?,
),
(
"artifacts".to_string(),
Fv::array_from(artifacts, &[Resource::field_type()])?,
),
("status".to_string(), Fv::Text(self.status.to_string())),
(
"usage".to_string(),
Fv::map_from(usage, &BTreeMap::from([("*".into(), Ft::U64)]))?,
),
("updated_at".to_string(), Fv::U64(self.updated_at)),
(
"steering_messages".to_string(),
if let Some(msg) = self.steering_messages.clone() {
msg.into()
} else {
Fv::Null
},
),
(
"follow_up_messages".to_string(),
if let Some(msg) = self.follow_up_messages.clone() {
msg.into()
} else {
Fv::Null
},
),
(
"label".to_string(),
if let Some(label) = self.label.clone() {
label.into()
} else {
Fv::Null
},
),
(
"extra".to_string(),
if let Some(extra) = self.extra.clone() {
extra.into()
} else {
Fv::Null
},
),
]);
if let Some(child) = self.child {
changes.insert("child".to_string(), Fv::U64(child));
}
if let Some(reason) = &self.failed_reason {
changes.insert("failed_reason".to_string(), Fv::Text(reason.clone()));
}
Ok(changes)
}
/// Builds an incremental delta from borrowed conversation data.
pub fn to_delta(&self, messages_offset: usize, artifacts_offset: usize) -> ConversationDelta {
ConversationDelta {
_id: self._id,
messages: self
.messages
.iter()
.skip(messages_offset)
.cloned()
.collect(),
artifacts: self
.artifacts
.iter()
.skip(artifacts_offset)
.cloned()
.collect(),
status: self.status.clone(),
usage: self.usage.clone(),
failed_reason: self.failed_reason.clone(),
updated_at: self.updated_at,
child: self.child,
}
}
/// Builds an incremental delta while consuming the conversation.
pub fn into_delta(self, messages_offset: usize, artifacts_offset: usize) -> ConversationDelta {
ConversationDelta {
_id: self._id,
messages: self.messages.into_iter().skip(messages_offset).collect(),
artifacts: self.artifacts.into_iter().skip(artifacts_offset).collect(),
status: self.status,
usage: self.usage,
failed_reason: self.failed_reason,
updated_at: self.updated_at,
child: self.child,
}
}
}
/// Message view used when embedding conversations for search and recall.
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct PrunedMessage {
/// Message role.
pub role: String,
/// Visible or reasoning content retained after pruning.
pub content: Vec<ContentPart>,
/// Optional participant name.
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// Optional sender principal string.
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
/// Optional RFC 3339 timestamp string.
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
impl PrunedMessage {
/// Prunes non-visible content from a message and converts metadata to strings.
pub fn try_from(mut msg: Message) -> Option<Self> {
msg.prune_content();
Some(Self {
role: msg.role,
content: msg.content,
name: msg.name,
user: msg.user.map(|u| u.to_string()),
timestamp: msg.timestamp.and_then(rfc3339_datetime),
})
}
}
impl From<Conversation> for Document {
fn from(conversation: Conversation) -> Self {
let mut metadata = BTreeMap::from([
("_id".to_string(), conversation._id.into()),
("type".to_string(), "Conversation".into()),
("user".to_string(), conversation.user.to_string().into()),
("status".to_string(), conversation.status.to_string().into()),
]);
if let Some(created_at) = rfc3339_datetime(conversation.created_at) {
metadata.insert("created_at".to_string(), created_at.into());
}
if let Some(updated_at) = rfc3339_datetime(conversation.updated_at) {
metadata.insert("updated_at".to_string(), updated_at.into());
}
if let Some(thread) = conversation.thread {
metadata.insert("thread".to_string(), thread.to_string().into());
}
if let Some(label) = conversation.label {
metadata.insert("label".to_string(), label.into());
}
let message: Vec<PrunedMessage> = conversation
.messages
.iter()
.filter_map(|v| {
serde_json::from_value::<Message>(v.clone())
.ok()
.and_then(PrunedMessage::try_from)
})
.collect();
Self {
content: serde_json::to_value(message).unwrap_or_default(),
metadata,
}
}
}
/// Borrowed view of a conversation for insertion into AndaDB.
#[derive(Debug, Serialize)]
pub struct ConversationRef<'a> {
/// Conversation ID. `0` lets AndaDB assign a new ID.
pub _id: u64,
/// Principal that owns the conversation.
pub user: &'a Principal,
/// Optional thread identifier.
pub thread: Option<&'a Xid>,
/// Serialized message history.
pub messages: &'a [Json],
/// Input resources attached to the conversation.
pub resources: &'a [Resource],
/// Artifacts produced by the conversation.
pub artifacts: &'a [Resource],
/// Current conversation lifecycle status.
pub status: &'a ConversationStatus,
/// Accumulated model and tool usage.
pub usage: &'a Usage,
/// Pending steering messages.
#[serde(skip_serializing_if = "Option::is_none")]
pub steering_messages: &'a Option<Vec<String>>,
/// Pending follow-up messages.
#[serde(skip_serializing_if = "Option::is_none")]
pub follow_up_messages: &'a Option<Vec<String>>,
/// Optional conversation label.
#[serde(skip_serializing_if = "Option::is_none")]
pub label: &'a Option<String>,
/// Extra unindexed metadata.
pub extra: &'a Option<Json>,
/// Hour bucket used for expiration scans.
pub period: u64,
/// Creation timestamp in milliseconds.
pub created_at: u64,
/// Last update timestamp in milliseconds.
pub updated_at: u64,
/// Child continuation conversation ID.
#[serde(skip_serializing_if = "Option::is_none")]
pub child: &'a Option<u64>,
/// Ancestor conversation IDs from root to parent.
#[serde(skip_serializing_if = "Option::is_none")]
pub ancestors: &'a Option<Vec<u64>>,
}
impl<'a> From<&'a Conversation> for ConversationRef<'a> {
fn from(conversation: &'a Conversation) -> Self {
Self {
_id: conversation._id,
user: &conversation.user,
thread: conversation.thread.as_ref(),
messages: &conversation.messages,
resources: &conversation.resources,
artifacts: &conversation.artifacts,
status: &conversation.status,
usage: &conversation.usage,
steering_messages: &conversation.steering_messages,
follow_up_messages: &conversation.follow_up_messages,
label: &conversation.label,
extra: &conversation.extra,
period: conversation.period,
created_at: conversation.created_at,
updated_at: conversation.updated_at,
child: &conversation.child,
ancestors: &conversation.ancestors,
}
}
}
/// Lightweight conversation state returned by management APIs.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConversationState {
/// Conversation ID.
pub _id: u64,
/// Current lifecycle status.
pub status: ConversationStatus,
}
impl From<&ConversationRef<'_>> for ConversationState {
fn from(conversation: &ConversationRef<'_>) -> Self {
Self {
_id: conversation._id,
status: conversation.status.clone(),
}
}
}
impl From<&Conversation> for ConversationState {
fn from(conversation: &Conversation) -> Self {
Self {
_id: conversation._id,
status: conversation.status.clone(),
}
}
}
/// A delta of a conversation since a given offset, used for incremental fetching of conversation messages.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConversationDelta {
/// Conversation ID.
pub _id: u64,
/// The new messages since the given offset. The offset is determined by the client and is not stored in the database. It is used to support incremental fetching of conversation messages.
pub messages: Vec<Json>,
/// New artifacts since the requested offset.
pub artifacts: Vec<Resource>,
/// Current lifecycle status.
pub status: ConversationStatus,
/// Accumulated usage at the time the delta was read.
pub usage: Usage,
/// Failure reason when the conversation failed.
pub failed_reason: Option<String>,
/// Last update timestamp in milliseconds.
pub updated_at: u64,
/// Child continuation conversation ID, when present.
pub child: Option<u64>,
}
/// Conversation lifecycle state.
#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ConversationStatus {
/// Conversation has been submitted but not yet picked up by a runner.
#[default]
Submitted,
/// Runner is actively processing the conversation.
Working,
/// Runner is idle and can accept follow-up input.
Idle,
/// Conversation finished successfully.
Completed,
/// Conversation was cancelled.
Cancelled,
/// Conversation failed.
Failed,
}
impl fmt::Display for ConversationStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConversationStatus::Submitted => write!(f, "submitted"),
ConversationStatus::Working => write!(f, "working"),
ConversationStatus::Idle => write!(f, "idle"),
ConversationStatus::Completed => write!(f, "completed"),
ConversationStatus::Cancelled => write!(f, "cancelled"),
ConversationStatus::Failed => write!(f, "failed"),
}
}
}
/// Storage wrapper for the conversations collection.
#[derive(Debug, Clone)]
pub struct Conversations {
/// Underlying AndaDB collection.
pub conversations: Arc<Collection>,
}
impl Conversations {
/// Opens or creates the conversations collection and indexes.
pub async fn connect(db: Arc<AndaDB>, name: String) -> Result<Self, BoxError> {
let mut schema = Conversation::schema()?;
schema.with_version(4);
let conversations = db
.open_or_create_collection(
schema,
CollectionConfig {
name,
description: "conversations collection".to_string(),
},
async |collection| {
// set tokenizer
collection.set_tokenizer(jieba_tokenizer());
// create BTree indexes if not exists
collection.create_btree_index_nx(&["user"]).await?;
collection.create_btree_index_nx(&["thread"]).await?;
collection.create_btree_index_nx(&["period"]).await?;
collection
.create_bm25_index_nx(&["messages", "resources", "artifacts"])
.await?;
Ok::<(), DBError>(())
},
)
.await?;
Ok(Self { conversations })
}
/// Adds a conversation and flushes the collection.
pub async fn add_conversation(
&self,
conversation: ConversationRef<'_>,
) -> Result<u64, DBError> {
let id = self.conversations.add_from(&conversation).await?;
self.conversations.flush(unix_ms()).await?;
Ok(id)
}
/// Updates selected conversation fields and flushes the collection.
pub async fn update_conversation(
&self,
id: u64,
fields: BTreeMap<String, Fv>,
) -> Result<(), DBError> {
self.conversations.update(id, fields).await?;
self.conversations.flush(unix_ms()).await?;
Ok(())
}
/// Retrieves a conversation by ID.
pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
self.conversations.get_as(id).await
}
/// Deletes a conversation by ID and returns whether it existed.
pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
let doc = self.conversations.remove(id).await?;
self.conversations.flush(unix_ms()).await?;
Ok(doc.is_some())
}
/// Retrieves the user's conversations matching `ids`.
///
/// The underlying query limit is capped at 1000 IDs per call.
pub async fn batch_get_conversations(
&self,
user: &Principal,
ids: Vec<u64>,
) -> Result<Vec<Conversation>, BoxError> {
if ids.is_empty() {
return Ok(Vec::new());
}
// `limit: None` falls back to the database default (10); request the full batch instead.
let limit = ids.len();
let filter = Some(Filter::And(vec![
Box::new(Filter::Field((
"_id".to_string(),
RangeQuery::Include(ids.into_iter().map(Fv::U64).collect()),
))),
Box::new(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
]));
let rt: Vec<Conversation> = self
.conversations
.search_as(Query {
search: None,
filter,
limit: Some(limit),
})
.await?;
Ok(rt)
}
/// Lists the user's conversations, newest first, with cursor-based pagination.
pub async fn list_conversations_by_user(
&self,
user: &Principal,
cursor: Option<String>,
limit: Option<usize>,
) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
// 0 means "no limit" to the database, and an empty page would panic below; clamp instead.
let limit = limit.unwrap_or(10).clamp(1, 100);
let cursor = match BTree::from_cursor::<u64>(&cursor)? {
Some(cursor) => cursor,
None => self.conversations.max_document_id() + 1,
};
let filter = Some(Filter::And(vec![
Box::new(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
Box::new(Filter::Field((
"_id".to_string(),
RangeQuery::Lt(Fv::U64(cursor)),
))),
]));
let mut rt: Vec<Conversation> = self
.conversations
.search_as(Query {
search: None,
filter,
limit: Some(limit),
})
.await?;
// The page holds the newest matching conversations; the next cursor is the smallest ID,
// so the following page fetches strictly older ones.
let cursor = if rt.len() >= limit {
rt.iter()
.map(|conversation| conversation._id)
.min()
.and_then(|id| BTree::to_cursor(&id))
} else {
None
};
rt.sort_by_key(|conversation| std::cmp::Reverse(conversation._id));
Ok((rt, cursor))
}
/// Searches a user's conversations with the BM25 conversation index.
pub async fn search_conversations(
&self,
user: &Principal,
query: String,
limit: Option<usize>,
) -> Result<Vec<Conversation>, BoxError> {
let limit = limit.unwrap_or(10).clamp(1, 100);
let rt = self
.conversations
.search_as(Query {
search: Some(Search {
text: Some(query),
logical_search: true,
..Default::default()
}),
filter: Some(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
limit: Some(limit),
})
.await?;
Ok(rt)
}
/// Deletes all conversations created before `timestamp` (in milliseconds).
pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
let period = timestamp / 3600 / 1000;
let mut count = 0u64;
loop {
let ids = next_expired_batch(&self.conversations, period).await?;
if ids.is_empty() {
break;
}
let mut removed = 0u64;
for id in ids {
if matches!(self.conversations.remove(id).await, Ok(Some(_))) {
removed += 1;
}
}
count += removed;
if removed == 0 {
// Nothing was removable; stop instead of spinning on undeletable documents.
break;
}
}
self.conversations.flush(unix_ms()).await?;
Ok(count)
}
}
/// The maximum number of expired conversations fetched per deletion batch. The database caps
/// query limits at 1000.
const DELETE_EXPIRED_BATCH: usize = 1000;
/// Returns the next batch of conversation IDs whose `period` is older than `period`.
async fn next_expired_batch(conversations: &Collection, period: u64) -> Result<Vec<u64>, BoxError> {
let ids = conversations
.search_ids(Query {
search: None,
filter: Some(Filter::Field((
"period".to_string(),
RangeQuery::Lt(Fv::U64(period)),
))),
limit: Some(DELETE_EXPIRED_BATCH),
})
.await?;
Ok(ids)
}
/// High-level memory manager for conversations, resources, and the Cognitive Nexus.
#[derive(Debug, Clone)]
pub struct MemoryManagement {
/// Shared Cognitive Nexus used for KIP execution.
pub nexus: Arc<CognitiveNexus>,
/// Conversation collection.
pub conversations: Arc<Collection>,
/// Resource collection.
pub resources: Arc<Collection>,
/// Function definition exposed for the writable KIP tool.
pub kip_function_definitions: FunctionDefinition,
}
impl MemoryManagement {
/// Opens or creates all memory collections and connects them to a nexus.
pub async fn connect(db: Arc<AndaDB>, nexus: Arc<CognitiveNexus>) -> Result<Self, BoxError> {
let conversations = Conversations::connect(db.clone(), "conversations".to_string())
.await?
.conversations;
let schema = Resource::schema()?;
let resources = db
.open_or_create_collection(
schema,
CollectionConfig {
name: "resources".to_string(),
description: "Resources collection".to_string(),
},
async |collection| {
// set tokenizer
collection.set_tokenizer(jieba_tokenizer());
// create BTree indexes if not exists
collection.create_btree_index_nx(&["tags"]).await?;
collection.create_btree_index_nx(&["hash"]).await?;
collection.create_btree_index_nx(&["mime_type"]).await?;
collection
.create_bm25_index_nx(&["name", "description", "metadata"])
.await?;
Ok::<(), DBError>(())
},
)
.await?;
Ok(Self {
nexus,
conversations,
resources,
kip_function_definitions: FUNCTION_DEFINITION.clone(),
})
}
/// Overrides the writable KIP tool definition.
pub fn with_kip_function_definitions(mut self, def: FunctionDefinition) -> Self {
self.kip_function_definitions = def;
self
}
/// Returns the shared Cognitive Nexus handle.
pub fn nexus(&self) -> Arc<CognitiveNexus> {
self.nexus.clone()
}
/// Views the conversations collection through the shared [`Conversations`] API.
fn as_conversations(&self) -> Conversations {
Conversations {
conversations: self.conversations.clone(),
}
}
/// Returns the largest conversation document ID currently known.
pub fn max_conversation_id(&self) -> u64 {
self.conversations.max_document_id()
}
/// Describes the Cognitive Nexus primer.
pub async fn describe_primer(&self) -> Result<Json, KipError> {
let (primer, _) = self
.nexus
.execute_meta(MetaCommand::Describe(DescribeTarget::Primer))
.await?;
Ok(primer)
}
/// Describes the system identity and known domains.
pub async fn describe_system(&self) -> Result<Json, KipError> {
let system = self
.nexus
.get_concept(&ConceptPK::Object {
r#type: PERSON_TYPE.to_string(),
name: META_SYSTEM_NAME.to_string(),
})
.await?;
let (domains, _) = self
.nexus
.execute_meta(MetaCommand::Describe(DescribeTarget::Domains))
.await?;
Ok(json!({
"identity": system.to_concept_node(),
"domains": domains,
}))
}
/// Describes the caller identity stored in the nexus.
pub async fn describe_caller(&self, id: &Principal) -> Result<Json, KipError> {
let user = self
.nexus
.get_concept(&ConceptPK::Object {
r#type: PERSON_TYPE.to_string(),
name: id.to_string(),
})
.await?;
Ok(user.to_concept_node())
}
/// Gets or initializes the caller identity concept in the nexus.
pub async fn get_or_init_caller(
&self,
id: &Principal,
name: Option<String>,
) -> Result<Json, KipError> {
let mut attributes = Map::new();
let mut metadata = Map::new();
attributes.insert("id".to_string(), id.to_string().into());
attributes.insert("person_class".to_string(), "Human".into());
if let Some(name) = name {
attributes.insert("name".to_string(), name.into());
}
metadata.insert("author".to_string(), "$system".into());
metadata.insert("status".to_string(), "active".into());
let user = self
.nexus
.get_or_init_concept(
PERSON_TYPE.to_string(),
id.to_string(),
attributes,
metadata,
)
.await?;
Ok(user.to_concept_node())
}
/// Adds one resource reference and flushes the resource collection.
pub async fn add_resource(&self, resource: ResourceRef<'_>) -> Result<u64, DBError> {
let id = self.resources.add_from(&resource).await?;
self.resources.flush(unix_ms()).await?;
Ok(id)
}
/// Adds resources when needed and returns resource references without blobs.
pub async fn try_add_resources(
&self,
resources: &[Resource],
) -> Result<Vec<Resource>, BoxError> {
let mut rs: Vec<Resource> = Vec::with_capacity(resources.len());
let mut count = 0;
for r in resources.iter() {
let rf: ResourceRef = r.into();
let id = if r._id > 0 {
// Stored resources carry no owner, so only existence can be verified here;
// rejecting unknown IDs keeps conversations free of dangling references.
if !self.resources.contains(r._id) {
return Err(format!("resource {} does not exist", r._id).into());
}
r._id
} else {
match self.resources.add_from(&rf).await {
Ok(id) => {
count += 1;
id
}
Err(DBError::AlreadyExists { _id, .. }) => _id,
Err(err) => Err(err)?,
}
};
let r2 = Resource {
_id: id,
blob: None,
..r.clone()
};
rs.push(r2)
}
if count > 0 {
self.resources.flush(unix_ms()).await?;
}
Ok(rs)
}
/// Retrieves a resource by ID.
pub async fn get_resource(&self, id: u64) -> Result<Resource, DBError> {
self.resources.get_as(id).await
}
/// Adds a conversation through the shared conversations API.
pub async fn add_conversation(
&self,
conversation: ConversationRef<'_>,
) -> Result<u64, DBError> {
self.as_conversations().add_conversation(conversation).await
}
/// Updates selected conversation fields through the shared conversations API.
pub async fn update_conversation(
&self,
id: u64,
fields: BTreeMap<String, Fv>,
) -> Result<(), DBError> {
self.as_conversations()
.update_conversation(id, fields)
.await
}
/// Retrieves a conversation by ID.
pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
self.as_conversations().get_conversation(id).await
}
/// Deletes a conversation by ID.
pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
self.as_conversations().delete_conversation(id).await
}
/// Lists a user's conversations with cursor-based pagination.
pub async fn list_conversations_by_user(
&self,
user: &Principal,
cursor: Option<String>,
limit: Option<usize>,
) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
self.as_conversations()
.list_conversations_by_user(user, cursor, limit)
.await
}
/// Searches a user's conversations by text query.
pub async fn search_conversations(
&self,
user: &Principal,
query: String,
limit: Option<usize>,
) -> Result<Vec<Conversation>, BoxError> {
self.as_conversations()
.search_conversations(user, query, limit)
.await
}
/// Deletes all conversations created before `timestamp` (in milliseconds), together with the
/// resources and artifacts they reference.
pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
let period = timestamp / 3600 / 1000;
let mut count = 0u64;
loop {
let ids = next_expired_batch(&self.conversations, period).await?;
if ids.is_empty() {
break;
}
let mut removed = 0u64;
for id in ids {
if let Ok(Some(doc)) = self.conversations.remove(id).await {
removed += 1;
if let Ok(conversation) = doc.try_into::<Conversation>() {
for resource in conversation
.resources
.into_iter()
.chain(conversation.artifacts)
{
if resource._id > 0 {
let _ = self.resources.remove(resource._id).await;
}
}
}
}
}
count += removed;
if removed == 0 {
// Nothing was removable; stop instead of spinning on undeletable documents.
break;
}
}
let now_ms = unix_ms();
self.conversations.flush(now_ms).await?;
self.resources.flush(now_ms).await?;
Ok(count)
}
}
/// KIP tool for memory management
impl Tool<BaseCtx> for MemoryManagement {
type Args = Request;
type Output = Response;
fn name(&self) -> String {
self.kip_function_definitions.name.clone()
}
fn description(&self) -> String {
self.kip_function_definitions.description.clone()
}
fn group(&self) -> Option<ToolGroupInfo> {
Some(memory_tool_group_info())
}
fn definition(&self) -> FunctionDefinition {
self.kip_function_definitions.clone()
}
async fn call(
&self,
_ctx: BaseCtx,
request: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let (_, res) = request.execute(self.nexus.as_ref()).await;
Ok(ToolOutput {
is_error: if matches!(res, Response::Err { .. }) {
Some(true)
} else {
None
},
output: res,
artifacts: Vec::new(),
usage: Usage::default(),
tools_usage: HashMap::new(),
})
}
}
/// A read-only version of the KIP tool for memory management, which does not allow any modifications to the memory and is safe to use for retrieval operations.
#[derive(Debug, Clone)]
pub struct MemoryReadonly {
memory: Arc<MemoryManagement>,
}
impl MemoryReadonly {
/// Function name used when registering the read-only KIP tool.
pub const NAME: &'static str = "execute_kip_readonly";
/// Creates a new MemoryReadonly instance
pub fn new(memory: Arc<MemoryManagement>) -> Self {
Self { memory }
}
}
impl Tool<BaseCtx> for MemoryReadonly {
type Args = Request;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to read from your persistent memory. This tool does not allow any modifications to the memory and is safe to use for retrieval operations.".to_string()
}
fn group(&self) -> Option<ToolGroupInfo> {
Some(memory_tool_group_info())
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.memory.kip_function_definitions.parameters.clone(),
strict: Some(true),
}
}
async fn call(
&self,
_ctx: BaseCtx,
mut request: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let (_, res) = request.readonly().execute(self.memory.nexus.as_ref()).await;
Ok(ToolOutput {
is_error: if matches!(res, Response::Err { .. }) {
Some(true)
} else {
None
},
output: res,
artifacts: Vec::new(),
usage: Usage::default(),
tools_usage: HashMap::new(),
})
}
}
/// Arguments for "get_resource_content" tool
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct GetResourceContentArgs {
/// The ID of the resource to get
pub _id: u64,
}
/// Tool that retrieves the full content for a stored resource.
#[derive(Debug, Clone)]
pub struct GetResourceContentTool {
memory: Arc<MemoryManagement>,
schema: Json,
}
impl GetResourceContentTool {
/// Function name used when registering the resource-content tool.
pub const NAME: &'static str = "get_resource_content";
/// Creates a new GetResourceContentTool instance
pub fn new(memory: Arc<MemoryManagement>) -> Self {
let schema = gen_schema_for::<GetResourceContentArgs>();
Self { memory, schema }
}
}
impl Tool<BaseCtx> for GetResourceContentTool {
type Args = GetResourceContentArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"Retrieves the full content of a stored resource by its ID. Returns the content as plain text if UTF-8 encoded, or as a base64url-encoded string for binary data. If the resource has no local blob but has a URI, it will be fetched from the remote source.".to_string()
}
fn group(&self) -> Option<ToolGroupInfo> {
Some(memory_tool_group_info())
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.schema.clone(),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let res = self.memory.get_resource(args._id).await?;
let text = match res.blob {
Some(blob) => match String::from_utf8(blob.0) {
Ok(s) => s,
Err(e) => ByteBufB64(e.into_bytes()).to_string(),
},
None => match res.uri {
Some(uri) => FetchWebResourcesTool::fetch_as_text(&ctx, &uri).await?,
None => Err(format!("Invalid resource {}, no blob or uri", args._id))?,
},
};
Ok(ToolOutput::new(Response::ok(text.into())))
}
}
/// Arguments for "list_previous_conversations" tool
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct ListConversationsArgs {
/// The cursor for pagination
#[serde(default)]
pub cursor: String,
/// The limit for pagination, max 100
#[serde(default)]
pub limit: usize,
}
/// Tool that lists previous conversations for the current caller.
#[derive(Debug, Clone)]
pub struct ListConversationsTool {
conversations: Conversations,
description: String,
}
impl ListConversationsTool {
/// Function name used when registering the conversation-list tool.
pub const NAME: &'static str = "list_previous_conversations";
/// Creates a new ListConversationsTool instance
pub fn new(conversations: Conversations) -> Self {
Self { conversations, description: "Lists the current user's previous conversations in reverse chronological order with cursor-based pagination. Returns conversation metadata including status, timestamps, messages, and usage statistics. Use the cursor parameter to paginate through older conversations.".to_string() }
}
/// Overrides the function description exposed to the model.
pub fn with_description(mut self, description: String) -> Self {
self.description = description;
self
}
}
impl Tool<BaseCtx> for ListConversationsTool {
type Args = ListConversationsArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
self.description.clone()
}
fn group(&self) -> Option<ToolGroupInfo> {
Some(memory_tool_group_info())
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description.clone(),
parameters: json!({
"type": "object",
"properties": {
"cursor": {
"type": "string",
"description": "The cursor for pagination, returned from the previous call. Use an empty string for the first page."
},
"limit": {
"type": "integer",
"description": "The maximum number of conversations to return, between 1 and 100. Default is 10."
}
},
"required": ["cursor", "limit"],
"additionalProperties": false
}),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let (conversations, next_cursor) = self
.conversations
.list_conversations_by_user(
ctx.caller(),
if args.cursor.is_empty() {
None
} else {
Some(args.cursor)
},
if args.limit == 0 {
None
} else {
Some(args.limit)
},
)
.await?;
let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
Ok(ToolOutput::new(Response::Ok {
result: Documents::from(docs).to_string().into(),
next_cursor,
}))
}
}
/// Arguments for "search_conversations" tool
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct SearchConversationsArgs {
/// The query string to search
pub query: String,
/// The max number of conversations to return, max 100
#[serde(default)]
pub limit: usize,
}
/// Tool that searches previous conversations for the current caller.
#[derive(Debug, Clone)]
pub struct SearchConversationsTool {
conversations: Conversations,
description: String,
}
impl SearchConversationsTool {
/// Function name used when registering the conversation-search tool.
pub const NAME: &'static str = "search_conversations";
/// Creates a new SearchConversationsTool instance
pub fn new(conversations: Conversations) -> Self {
Self { conversations, description: "Performs a full-text search across the current user's conversation history using a query string. Searches through messages, resources, and artifacts to find relevant past conversations. Use this to recall specific topics, instructions, or context from previous interactions.".to_string() }
}
/// Overrides the function description exposed to the model.
pub fn with_description(mut self, description: String) -> Self {
self.description = description;
self
}
}
impl Tool<BaseCtx> for SearchConversationsTool {
type Args = SearchConversationsArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
self.description.clone()
}
fn group(&self) -> Option<ToolGroupInfo> {
Some(memory_tool_group_info())
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description.clone(),
parameters: json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query string to search for in the conversation history."
},
"limit": {
"type": "integer",
"description": "The maximum number of conversations to return, between 1 and 100. Default is 10."
}
},
"required": ["query", "limit"],
"additionalProperties": false
}),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let conversations = self
.conversations
.search_conversations(
ctx.caller(),
args.query,
if args.limit == 0 {
None
} else {
Some(args.limit)
},
)
.await?;
let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
Ok(ToolOutput::new(Response::Ok {
result: Documents::from(docs).to_string().into(),
next_cursor: None,
}))
}
}
/// Arguments for "memory_api" tool
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum MemoryToolArgs {
/// Get one resource from a conversation owned by the caller.
GetResource {
/// The ID of the resource to get
_id: u64,
/// The ID of the conversation where the resource is located
conversation: u64,
},
/// Get a conversation by ID
GetConversation {
/// The ID of the conversation to get
_id: u64,
},
/// Get a conversation delta by message and artifact offsets.
GetConversationDelta {
/// The ID of the conversation to get
_id: u64,
/// The messages offset for the conversation delta
messages_offset: usize,
/// The artifacts offset for the conversation delta
artifacts_offset: usize,
},
/// Stop an in-progress conversation.
StopConversation {
/// The ID of the conversation to stop
_id: u64,
},
/// Interrupt a conversation with a steering message.
SteerConversation {
/// The ID of the conversation to steer
_id: u64,
/// The steering message to interrupt the agent mid-run, delivered after current tool execution, skips remaining tools.
message: String,
},
/// Queue a follow-up message for the next safe turn.
FollowUpConversation {
/// The ID of the conversation to follow up
_id: u64,
/// The follow-up message to queue for the agent's next safe user turn.
message: String,
},
/// Delete a conversation owned by the caller.
DeleteConversation {
/// The ID of the conversation to delete
_id: u64,
},
/// List previous conversations
ListPrevConversations {
/// The cursor for pagination
cursor: Option<String>,
/// The limit for pagination, default to 10
limit: Option<usize>,
},
/// Search conversations
SearchConversations {
/// The query string to search
query: String,
/// The max number of conversations to return, default to 10
limit: Option<usize>,
},
}
/// A tool for conversation API
#[derive(Debug, Clone)]
pub struct MemoryTool {
memory: Arc<MemoryManagement>,
schema: Json,
}
impl MemoryTool {
/// Function name used when registering the unified memory API tool.
pub const NAME: &'static str = "memory_api";
/// Creates a new SearchConversationsTool instance
pub fn new(memory: Arc<MemoryManagement>) -> Self {
let schema = memory_tool_schema();
Self { memory, schema }
}
}
fn memory_tool_schema() -> Json {
json!({
"type": "object",
"description": "Select one memory API action with type, then provide the fields used by that action. Fields not used by the selected type should be null.",
"properties": {
"type": {
"type": "string",
"enum": [
"GetResource",
"GetConversation",
"GetConversationDelta",
"StopConversation",
"SteerConversation",
"FollowUpConversation",
"DeleteConversation",
"ListPrevConversations",
"SearchConversations"
],
"description": "Memory API action to perform."
},
"_id": {
"type": ["integer", "null"],
"description": "Resource or conversation ID. Required for actions that target a single resource or conversation."
},
"conversation": {
"type": ["integer", "null"],
"description": "Conversation ID containing the resource. Required for GetResource."
},
"messages_offset": {
"type": ["integer", "null"],
"description": "Messages offset for GetConversationDelta. Use 0 for the first delta read."
},
"artifacts_offset": {
"type": ["integer", "null"],
"description": "Artifacts offset for GetConversationDelta. Use 0 for the first delta read."
},
"message": {
"type": ["string", "null"],
"description": "Message used by SteerConversation or FollowUpConversation."
},
"cursor": {
"type": ["string", "null"],
"description": "Pagination cursor for ListPrevConversations. Use null for the first page."
},
"limit": {
"type": ["integer", "null"],
"description": "Maximum results for listing or searching conversations. Use null for the default."
},
"query": {
"type": ["string", "null"],
"description": "Search query for SearchConversations."
}
},
"required": [
"type",
"_id",
"conversation",
"messages_offset",
"artifacts_offset",
"message",
"cursor",
"limit",
"query"
],
"additionalProperties": false
})
}
impl Tool<BaseCtx> for MemoryTool {
type Args = MemoryToolArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"A unified API for managing conversations and memory. Supports retrieving resources and conversation details, stopping or steering in-progress conversations, sending follow-up messages, deleting conversations, listing previous conversations with pagination, searching conversation history by keyword, and listing KIP command logs.".to_string()
}
fn group(&self) -> Option<ToolGroupInfo> {
Some(memory_tool_group_info())
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.schema.clone(),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
match args {
MemoryToolArgs::GetResource { _id, conversation } => {
let conversation = self.memory.get_conversation(conversation).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
// Ownership is established through the conversation, so the resource must
// actually belong to it; otherwise any caller could read arbitrary resources
// by pairing them with a conversation they own.
if !conversation
.resources
.iter()
.chain(conversation.artifacts.iter())
.any(|resource| resource._id == _id)
{
return Err(format!(
"permission denied: resource {_id} does not belong to conversation {}",
conversation._id
)
.into());
}
let mut res = self.memory.get_resource(_id).await?;
if res.blob.is_none()
&& let Some(uri) = &res.uri
{
res.blob = FetchWebResourcesTool::fetch_as_bytes(&ctx, uri).await.ok();
}
Ok(ToolOutput::new(Response::Ok {
result: json!(res),
next_cursor: None,
}))
}
MemoryToolArgs::GetConversation { _id } => {
let conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::GetConversationDelta {
_id,
messages_offset,
artifacts_offset,
} => {
let conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation.into_delta(messages_offset, artifacts_offset)),
next_cursor: None,
}))
}
MemoryToolArgs::StopConversation { _id } => {
let mut conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
if conversation.status == ConversationStatus::Working
|| conversation.status == ConversationStatus::Submitted
{
conversation.status = ConversationStatus::Cancelled;
conversation.updated_at = unix_ms();
let changes = BTreeMap::from([
(
"status".to_string(),
Fv::Text(conversation.status.to_string()),
),
("updated_at".to_string(), Fv::U64(conversation.updated_at)),
]);
self.memory.update_conversation(_id, changes).await?;
}
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::SteerConversation { _id, message } => {
if message.trim().is_empty() {
return Err("steering message cannot be empty".into());
}
let mut conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let steering_messages = if let Some(msg) = conversation.steering_messages.clone() {
let mut msgs = msg;
msgs.push(message.clone());
msgs
} else {
vec![message.clone()]
};
conversation.steering_messages = Some(steering_messages.clone());
conversation.updated_at = unix_ms();
let changes = BTreeMap::from([
("steering_messages".to_string(), steering_messages.into()),
("updated_at".to_string(), Fv::U64(conversation.updated_at)),
]);
self.memory.update_conversation(_id, changes).await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::FollowUpConversation { _id, message } => {
if message.trim().is_empty() {
return Err("follow-up message cannot be empty".into());
}
let mut conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let follow_up_messages = if let Some(msg) = conversation.follow_up_messages.clone()
{
let mut msgs = msg;
msgs.push(message.clone());
msgs
} else {
vec![message.clone()]
};
conversation.follow_up_messages = Some(follow_up_messages.clone());
conversation.updated_at = unix_ms();
let changes = BTreeMap::from([
("follow_up_messages".to_string(), follow_up_messages.into()),
("updated_at".to_string(), Fv::U64(conversation.updated_at)),
]);
self.memory.update_conversation(_id, changes).await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::DeleteConversation { _id } => {
let conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let deleted = self.memory.delete_conversation(_id).await?;
Ok(ToolOutput::new(Response::Ok {
result: json!({ "deleted": deleted }),
next_cursor: None,
}))
}
MemoryToolArgs::ListPrevConversations { cursor, limit } => {
// Models often send "" instead of null for the first page.
let cursor = cursor.filter(|cursor| !cursor.is_empty());
let (conversations, next_cursor) = self
.memory
.list_conversations_by_user(ctx.caller(), cursor, limit)
.await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversations),
next_cursor,
}))
}
MemoryToolArgs::SearchConversations { query, limit } => {
let conversations = self
.memory
.search_conversations(ctx.caller(), query, limit)
.await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversations),
next_cursor: None,
}))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
context::{RemoteEngines, Web3SDK},
store::{InMemory, Store},
};
use anda_core::CancellationToken;
use anda_db::database::DBConfig;
use std::collections::BTreeSet;
fn principal(seed: u8) -> Principal {
Principal::self_authenticating([seed; 32])
}
fn message(role: &str, text: &str) -> Message {
Message {
role: role.to_string(),
content: vec![ContentPart::Text {
text: text.to_string(),
}],
user: Some(principal(7)),
timestamp: Some(1_700_000_000_000),
..Default::default()
}
}
fn resource(name: &str, blob: Option<&[u8]>) -> Resource {
Resource {
name: name.to_string(),
tags: vec!["text".to_string()],
description: Some(format!("{name} description")),
mime_type: Some("text/plain".to_string()),
blob: blob.map(|v| ByteBufB64(v.to_vec())),
size: blob.map(|v| v.len() as u64),
metadata: Some(Map::from_iter([("source".to_string(), json!("test"))])),
..Default::default()
}
}
fn conversation(user: Principal, text: &str, period: u64) -> Conversation {
let mut conversation = Conversation {
user,
thread: Some(Xid([period as u8; 12])),
resources: vec![resource("input", Some(b"input text"))],
artifacts: vec![resource("artifact", Some(b"artifact text"))],
status: ConversationStatus::Working,
failed_reason: Some("not finished".to_string()),
usage: Usage {
input_tokens: 10,
output_tokens: 20,
cached_tokens: 3,
requests: 2,
},
steering_messages: Some(vec!["steer-one".to_string()]),
follow_up_messages: Some(vec!["follow-one".to_string()]),
child: Some(99),
ancestors: Some(vec![1, 2]),
label: Some("label-a".to_string()),
extra: Some(json!({"priority": "high"})),
period,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_001,
..Default::default()
};
conversation.append_messages(vec![
message("user", text),
Message {
role: "assistant".to_string(),
content: vec![
ContentPart::Reasoning {
text: "thinking".to_string(),
},
ContentPart::ToolCall {
name: "lookup".to_string(),
args: json!({"q": text}),
call_id: Some("call-1".to_string()),
},
],
timestamp: Some(1_700_000_000_002),
..Default::default()
},
]);
conversation
}
async fn test_db() -> Arc<AndaDB> {
Arc::new(
AndaDB::connect(Arc::new(InMemory::new()), DBConfig::default())
.await
.unwrap(),
)
}
async fn test_conversations(name: &str) -> Conversations {
Conversations::connect(test_db().await, name.to_string())
.await
.unwrap()
}
async fn test_memory() -> Arc<MemoryManagement> {
let db = test_db().await;
let nexus = Arc::new(
CognitiveNexus::connect(db.clone(), async |_nexus| Ok(()))
.await
.unwrap(),
);
Arc::new(MemoryManagement::connect(db, nexus).await.unwrap())
}
fn test_ctx(caller: Principal) -> BaseCtx {
BaseCtx::new(
Principal::anonymous(),
"engine".to_string(),
"agent".to_string(),
CancellationToken::new(),
BTreeSet::new(),
Arc::new(Web3SDK::not_implemented()),
Store::new(Arc::new(InMemory::new())),
Arc::new(RemoteEngines::new()),
)
.with_caller(caller)
}
#[test]
fn test_conversation_status() {
let chat = Conversation {
status: ConversationStatus::Completed,
..Default::default()
};
let rt = ConversationStatus::Completed;
println!("{}", rt);
let rt = serde_json::to_string(&chat).unwrap();
assert!(rt.contains(r#","status":"completed","#));
let chat2: Conversation = serde_json::from_str(&rt).unwrap();
assert_eq!(chat.status, chat2.status);
let args = MemoryToolArgs::GetConversation { _id: 1 };
let rt = serde_json::to_string(&args).unwrap();
assert_eq!(rt, r#"{"type":"GetConversation","_id":1}"#);
let args1: MemoryToolArgs = serde_json::from_str(&rt).unwrap();
assert_eq!(args, args1);
let strict_args: MemoryToolArgs = serde_json::from_value(json!({
"type": "GetConversation",
"_id": 1,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": null,
"cursor": null,
"limit": null,
"query": null
}))
.unwrap();
assert_eq!(strict_args, args);
let schema = memory_tool_schema();
let required = schema["required"].as_array().unwrap();
let properties = schema["properties"].as_object().unwrap();
assert_eq!(required.len(), properties.len());
for key in properties.keys() {
assert!(required.iter().any(|item| item.as_str() == Some(key)));
}
}
#[test]
fn conversation_conversions_preserve_public_state() {
let user = principal(1);
let conversation = conversation(user, "alpha memory topic", 17);
let changes = conversation.to_changes().unwrap();
assert!(matches!(changes.get("messages"), Some(Fv::Array(_))));
assert!(matches!(changes.get("resources"), Some(Fv::Array(_))));
assert!(matches!(changes.get("artifacts"), Some(Fv::Array(_))));
assert!(matches!(
changes.get("status"),
Some(Fv::Text(v)) if v == "working"
));
assert!(matches!(
changes.get("steering_messages"),
Some(Fv::Array(_))
));
assert!(matches!(
changes.get("follow_up_messages"),
Some(Fv::Array(_))
));
assert!(matches!(changes.get("label"), Some(Fv::Text(v)) if v == "label-a"));
assert!(!matches!(changes.get("extra"), Some(Fv::Null) | None));
assert!(matches!(changes.get("child"), Some(Fv::U64(99))));
assert!(matches!(
changes.get("failed_reason"),
Some(Fv::Text(v)) if v == "not finished"
));
let delta = conversation.to_delta(1, 1);
assert_eq!(delta._id, conversation._id);
assert_eq!(delta.messages.len(), 1);
assert_eq!(delta.artifacts.len(), 0);
assert_eq!(delta.status, ConversationStatus::Working);
assert_eq!(delta.child, Some(99));
let owned_delta = conversation.clone().into_delta(0, 0);
assert_eq!(owned_delta.messages.len(), 2);
assert_eq!(owned_delta.artifacts.len(), 1);
let pruned = PrunedMessage::try_from(Message {
role: "assistant".to_string(),
content: vec![
ContentPart::Text {
text: "visible".to_string(),
},
ContentPart::ToolCall {
name: "hidden".to_string(),
args: json!({}),
call_id: None,
},
],
user: Some(user),
timestamp: Some(1_700_000_000_000),
..Default::default()
})
.unwrap();
assert_eq!(pruned.role, "assistant");
assert_eq!(pruned.content.len(), 2);
assert_eq!(pruned.user, Some(user.to_string()));
assert_eq!(
pruned.timestamp.as_deref(),
Some("2023-11-14T22:13:20.000Z")
);
let doc = Document::from(conversation.clone());
assert_eq!(doc.metadata["_id"], json!(0));
assert_eq!(doc.metadata["type"], json!("Conversation"));
assert_eq!(doc.metadata["user"], json!(user.to_string()));
assert_eq!(doc.metadata["thread"], json!(Xid([17; 12]).to_string()));
assert_eq!(doc.metadata["label"], json!("label-a"));
assert!(doc.content.as_array().unwrap().len() >= 2);
let conversation_ref = ConversationRef::from(&conversation);
assert_eq!(conversation_ref.user, &user);
assert_eq!(conversation_ref.thread, conversation.thread.as_ref());
assert_eq!(conversation_ref.messages.len(), 2);
assert_eq!(conversation_ref.child, &Some(99));
assert_eq!(
ConversationState::from(&conversation_ref).status,
conversation.status
);
assert_eq!(ConversationState::from(&conversation)._id, conversation._id);
let sparse = Conversation {
user,
updated_at: 1_700_000_000_010,
..Default::default()
};
let changes = sparse.to_changes().unwrap();
assert!(matches!(changes.get("steering_messages"), Some(Fv::Null)));
assert!(matches!(changes.get("follow_up_messages"), Some(Fv::Null)));
assert!(matches!(changes.get("label"), Some(Fv::Null)));
assert!(matches!(changes.get("extra"), Some(Fv::Null)));
assert!(!changes.contains_key("child"));
assert!(!changes.contains_key("failed_reason"));
}
#[test]
fn conversation_status_display_and_strict_args_cover_all_variants() {
let statuses = [
(ConversationStatus::Submitted, "submitted"),
(ConversationStatus::Working, "working"),
(ConversationStatus::Idle, "idle"),
(ConversationStatus::Completed, "completed"),
(ConversationStatus::Cancelled, "cancelled"),
(ConversationStatus::Failed, "failed"),
];
for (status, expected) in statuses {
assert_eq!(status.to_string(), expected);
}
let cases = vec![
(
json!({
"type": "GetResource",
"_id": 7,
"conversation": 3,
"messages_offset": null,
"artifacts_offset": null,
"message": null,
"cursor": null,
"limit": null,
"query": null
}),
MemoryToolArgs::GetResource {
_id: 7,
conversation: 3,
},
),
(
json!({
"type": "GetConversationDelta",
"_id": 3,
"conversation": null,
"messages_offset": 1,
"artifacts_offset": 2,
"message": null,
"cursor": null,
"limit": null,
"query": null
}),
MemoryToolArgs::GetConversationDelta {
_id: 3,
messages_offset: 1,
artifacts_offset: 2,
},
),
(
json!({
"type": "StopConversation",
"_id": 3,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": null,
"cursor": null,
"limit": null,
"query": null
}),
MemoryToolArgs::StopConversation { _id: 3 },
),
(
json!({
"type": "SteerConversation",
"_id": 3,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": "redirect",
"cursor": null,
"limit": null,
"query": null
}),
MemoryToolArgs::SteerConversation {
_id: 3,
message: "redirect".to_string(),
},
),
(
json!({
"type": "FollowUpConversation",
"_id": 3,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": "continue",
"cursor": null,
"limit": null,
"query": null
}),
MemoryToolArgs::FollowUpConversation {
_id: 3,
message: "continue".to_string(),
},
),
(
json!({
"type": "DeleteConversation",
"_id": 3,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": null,
"cursor": null,
"limit": null,
"query": null
}),
MemoryToolArgs::DeleteConversation { _id: 3 },
),
(
json!({
"type": "ListPrevConversations",
"_id": null,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": null,
"cursor": "abc",
"limit": 5,
"query": null
}),
MemoryToolArgs::ListPrevConversations {
cursor: Some("abc".to_string()),
limit: Some(5),
},
),
(
json!({
"type": "SearchConversations",
"_id": null,
"conversation": null,
"messages_offset": null,
"artifacts_offset": null,
"message": null,
"cursor": null,
"limit": 4,
"query": "alpha"
}),
MemoryToolArgs::SearchConversations {
query: "alpha".to_string(),
limit: Some(4),
},
),
];
for (value, expected) in cases {
let parsed: MemoryToolArgs = serde_json::from_value(value).unwrap();
assert_eq!(parsed, expected);
}
}
#[tokio::test]
async fn conversations_collection_crud_search_and_expiry() {
let conversations = test_conversations("conversation_test").await;
let user = principal(2);
let other_user = principal(3);
let mut first = conversation(user, "alpha searchable topic", 1);
first.status = ConversationStatus::Submitted;
let first_id = conversations
.add_conversation(ConversationRef::from(&first))
.await
.unwrap();
let mut second = conversation(user, "beta searchable topic", 2);
second.status = ConversationStatus::Completed;
second.failed_reason = None;
let second_id = conversations
.add_conversation(ConversationRef::from(&second))
.await
.unwrap();
let other = conversation(other_user, "alpha from another user", 3);
let other_id = conversations
.add_conversation(ConversationRef::from(&other))
.await
.unwrap();
let fetched = conversations.get_conversation(first_id).await.unwrap();
assert_eq!(fetched.user, user);
assert_eq!(fetched.status, ConversationStatus::Submitted);
let mut changes = fetched.to_changes().unwrap();
changes.insert("status".to_string(), Fv::Text("idle".to_string()));
conversations
.update_conversation(first_id, changes)
.await
.unwrap();
assert_eq!(
conversations
.get_conversation(first_id)
.await
.unwrap()
.status,
ConversationStatus::Idle
);
let batch = conversations
.batch_get_conversations(&user, vec![first_id, other_id, second_id])
.await
.unwrap();
assert_eq!(batch.len(), 2);
assert!(batch.iter().all(|item| item.user == user));
let (page, cursor) = conversations
.list_conversations_by_user(&user, None, Some(1))
.await
.unwrap();
assert_eq!(page.len(), 1);
assert!(cursor.is_some());
let (next_page, _) = conversations
.list_conversations_by_user(&user, cursor, Some(10))
.await
.unwrap();
assert_eq!(next_page.len(), 1);
let search = conversations
.search_conversations(&user, "alpha".to_string(), Some(10))
.await
.unwrap();
assert!(search.iter().all(|item| item.user == user));
assert!(!conversations.delete_conversation(999_999).await.unwrap());
assert!(conversations.delete_conversation(other_id).await.unwrap());
let deleted = conversations
.delete_expired_conversations(2 * 3600 * 1000)
.await
.unwrap();
assert_eq!(deleted, 1);
assert!(conversations.get_conversation(first_id).await.is_err());
assert!(conversations.get_conversation(second_id).await.is_ok());
}
#[tokio::test]
async fn pagination_batch_and_expiry_handle_limits_robustly() {
let conversations = test_conversations("pagination_test").await;
let user = principal(8);
let mut ids = Vec::new();
for i in 0..12u64 {
let item = conversation(user, &format!("topic {i}"), i + 1);
ids.push(
conversations
.add_conversation(ConversationRef::from(&item))
.await
.unwrap(),
);
}
// batch_get must return all requested conversations, not the database default of 10.
let batch = conversations
.batch_get_conversations(&user, ids.clone())
.await
.unwrap();
assert_eq!(batch.len(), 12);
assert!(
conversations
.batch_get_conversations(&user, Vec::new())
.await
.unwrap()
.is_empty()
);
// limit 0 is clamped instead of panicking or returning everything.
let (page, _) = conversations
.list_conversations_by_user(&user, None, Some(0))
.await
.unwrap();
assert_eq!(page.len(), 1);
assert_eq!(page[0]._id, *ids.last().unwrap());
// Pages are newest-first and the cursor walks backwards without overlap.
let mut seen = Vec::new();
let mut cursor = None;
loop {
let (page, next) = conversations
.list_conversations_by_user(&user, cursor, Some(5))
.await
.unwrap();
seen.extend(page.iter().map(|conversation| conversation._id));
match next {
Some(next) => cursor = Some(next),
None => break,
}
}
assert_eq!(seen.len(), 12);
assert!(
seen.windows(2).all(|pair| pair[0] > pair[1]),
"pages must be newest-first without duplicates: {seen:?}"
);
// Expired deletion must drain everything, not just the first database batch of 10.
let memory = test_memory().await;
let expiring_user = principal(9);
for i in 0..12u64 {
let item = conversation(expiring_user, &format!("expiring {i}"), i + 1);
memory
.add_conversation(ConversationRef::from(&item))
.await
.unwrap();
}
let deleted = memory
.delete_expired_conversations(13 * 3600 * 1000)
.await
.unwrap();
assert_eq!(deleted, 12);
// Referencing a resource ID that does not exist is rejected.
let err = memory
.try_add_resources(&[Resource {
_id: 999_999,
..resource("ghost", Some(b"ghost"))
}])
.await
.unwrap_err();
assert!(err.to_string().contains("does not exist"));
}
#[tokio::test]
async fn memory_management_resources_conversations_and_descriptions() {
let memory = test_memory().await;
let user = principal(4);
assert_eq!(memory.name(), "execute_kip");
assert!(
memory
.description()
.contains("Knowledge Interaction Protocol")
);
let definition = memory.definition();
assert_eq!(definition.name, "execute_kip");
assert_eq!(definition.strict, None);
assert_eq!(
memory
.as_ref()
.clone()
.with_kip_function_definitions(FunctionDefinition {
name: "custom_kip".to_string(),
description: "custom".to_string(),
parameters: json!({"type": "object"}),
strict: Some(false),
})
.name(),
"custom_kip"
);
let _ = memory.nexus();
assert!(memory.describe_primer().await.is_err());
assert!(memory.describe_system().await.is_err());
let caller = memory
.get_or_init_caller(&user, Some("Ada".to_string()))
.await
.unwrap();
assert_eq!(caller["attributes"]["id"], json!(user.to_string()));
let described = memory.describe_caller(&user).await.unwrap();
assert_eq!(described["attributes"]["name"], json!("Ada"));
let added_resource = resource("resource-a", Some(b"hello resource"));
let resource_id = memory
.add_resource(ResourceRef::from(&added_resource))
.await
.unwrap();
let fetched_resource = memory.get_resource(resource_id).await.unwrap();
assert_eq!(fetched_resource.name, "resource-a");
let existing = Resource {
_id: resource_id,
blob: Some(ByteBufB64(b"ignored blob".to_vec())),
..fetched_resource.clone()
};
let inserted = resource("resource-b", Some(b"new resource"));
let normalized = memory
.try_add_resources(&[existing, inserted])
.await
.unwrap();
assert_eq!(normalized.len(), 2);
assert_eq!(normalized[0]._id, resource_id);
assert!(normalized.iter().all(|item| item.blob.is_none()));
let mut stored = conversation(user, "gamma conversation", 1);
stored.resources = normalized.clone();
stored.artifacts = vec![Resource {
_id: normalized[1]._id,
..normalized[1].clone()
}];
let conversation_id = memory
.add_conversation(ConversationRef::from(&stored))
.await
.unwrap();
assert!(memory.max_conversation_id() >= conversation_id);
assert_eq!(
memory.get_conversation(conversation_id).await.unwrap().user,
user
);
let (listed, _) = memory
.list_conversations_by_user(&user, None, Some(10))
.await
.unwrap();
assert_eq!(listed.len(), 1);
let found = memory
.search_conversations(&user, "gamma".to_string(), Some(10))
.await
.unwrap();
assert!(found.iter().all(|item| item.user == user));
let deleted = memory
.delete_expired_conversations(2 * 3600 * 1000)
.await
.unwrap();
assert_eq!(deleted, 1);
assert!(memory.get_conversation(conversation_id).await.is_err());
assert!(memory.get_resource(resource_id).await.is_err());
}
#[tokio::test]
async fn memory_tools_share_one_capability_group() {
use anda_core::ToolSet;
let memory = test_memory().await;
let conversations = Conversations {
conversations: memory.conversations.clone(),
};
let mut tools = ToolSet::<BaseCtx>::new();
tools.add(memory.clone()).unwrap();
tools
.add(Arc::new(MemoryReadonly::new(memory.clone())))
.unwrap();
tools
.add(Arc::new(GetResourceContentTool::new(memory.clone())))
.unwrap();
tools
.add(Arc::new(ListConversationsTool::new(conversations.clone())))
.unwrap();
tools
.add(Arc::new(SearchConversationsTool::new(conversations)))
.unwrap();
tools
.add(Arc::new(MemoryTool::new(memory.clone())))
.unwrap();
let groups = tools.groups();
assert_eq!(groups.len(), 1);
let group = &groups[0];
assert_eq!(group.id, MEMORY_TOOL_GROUP_ID);
assert!(group.instructions.is_some());
// Every registered memory tool, including the dynamically named KIP tool,
// lands in the one bundle.
for name in [
memory.name(),
MemoryReadonly::NAME.to_string(),
GetResourceContentTool::NAME.to_string(),
ListConversationsTool::NAME.to_string(),
SearchConversationsTool::NAME.to_string(),
MemoryTool::NAME.to_string(),
] {
assert!(group.members.contains(&name), "missing member {name}");
}
}
#[tokio::test]
async fn memory_tools_expose_definitions_and_call_local_paths() {
let memory = test_memory().await;
let user = principal(5);
let ctx = test_ctx(user);
let text_resource = resource("text-resource", Some(b"plain text"));
let text_id = memory
.add_resource(ResourceRef::from(&text_resource))
.await
.unwrap();
let binary_resource = resource("binary-resource", Some(&[0, 159, 146, 150]));
let binary_id = memory
.add_resource(ResourceRef::from(&binary_resource))
.await
.unwrap();
let mut stored = conversation(user, "delta memory api", 3);
stored.resources = vec![Resource {
_id: text_id,
..text_resource.clone()
}];
stored.status = ConversationStatus::Working;
let conversation_id = memory
.add_conversation(ConversationRef::from(&stored))
.await
.unwrap();
let readonly = MemoryReadonly::new(memory.clone());
assert_eq!(readonly.name(), MemoryReadonly::NAME);
assert!(readonly.description().contains("read"));
assert_eq!(readonly.definition().strict, Some(true));
let get_content = GetResourceContentTool::new(memory.clone());
assert_eq!(get_content.name(), GetResourceContentTool::NAME);
assert!(get_content.description().contains("stored resource"));
assert_eq!(get_content.definition().strict, Some(true));
let output = get_content
.call(
ctx.clone(),
GetResourceContentArgs { _id: text_id },
Vec::new(),
)
.await
.unwrap();
match output.output {
Response::Ok { result, .. } => assert_eq!(result, json!("plain text")),
other => panic!("unexpected response: {other:?}"),
}
let output = get_content
.call(
ctx.clone(),
GetResourceContentArgs { _id: binary_id },
Vec::new(),
)
.await
.unwrap();
match output.output {
Response::Ok { result, .. } => {
assert!(result.as_str().unwrap().starts_with("AJ-S"));
}
other => panic!("unexpected response: {other:?}"),
}
let missing_id = memory
.add_resource(ResourceRef::from(&Resource {
name: "empty".to_string(),
tags: vec!["text".to_string()],
..Default::default()
}))
.await
.unwrap();
let err = get_content
.call(
ctx.clone(),
GetResourceContentArgs { _id: missing_id },
Vec::new(),
)
.await
.unwrap_err();
assert!(err.to_string().contains("no blob or uri"));
let list_tool = ListConversationsTool::new(Conversations {
conversations: memory.conversations.clone(),
})
.with_description("custom list".to_string());
assert_eq!(list_tool.name(), ListConversationsTool::NAME);
assert_eq!(list_tool.description(), "custom list");
assert_eq!(list_tool.definition().strict, Some(true));
let output = list_tool
.call(
ctx.clone(),
ListConversationsArgs {
cursor: String::new(),
limit: 0,
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let err = list_tool
.call(
ctx.clone(),
ListConversationsArgs {
cursor: "not-a-valid-cursor".to_string(),
limit: 2,
},
Vec::new(),
)
.await
.unwrap_err();
assert!(!err.to_string().is_empty());
let search_tool = SearchConversationsTool::new(Conversations {
conversations: memory.conversations.clone(),
})
.with_description("custom search".to_string());
assert_eq!(search_tool.name(), SearchConversationsTool::NAME);
assert_eq!(search_tool.description(), "custom search");
assert_eq!(search_tool.definition().strict, Some(true));
let output = search_tool
.call(
ctx.clone(),
SearchConversationsArgs {
query: "delta".to_string(),
limit: 0,
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let output = search_tool
.call(
ctx.clone(),
SearchConversationsArgs {
query: "delta".to_string(),
limit: 3,
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let memory_tool = MemoryTool::new(memory.clone());
assert_eq!(memory_tool.name(), MemoryTool::NAME);
assert!(memory_tool.description().contains("managing conversations"));
assert_eq!(memory_tool.definition().strict, Some(true));
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::GetConversation {
_id: conversation_id,
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::GetConversationDelta {
_id: conversation_id,
messages_offset: 1,
artifacts_offset: 0,
},
Vec::new(),
)
.await
.unwrap();
match output.output {
Response::Ok { result, .. } => {
let delta: ConversationDelta = serde_json::from_value(result).unwrap();
assert_eq!(delta.messages.len(), 1);
}
other => panic!("unexpected response: {other:?}"),
}
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::GetResource {
_id: text_id,
conversation: conversation_id,
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
// A resource that the conversation does not reference must be rejected even for the
// conversation owner.
let denied = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::GetResource {
_id: binary_id,
conversation: conversation_id,
},
Vec::new(),
)
.await
.unwrap_err();
assert!(denied.to_string().contains("does not belong"));
// Models often send "" for the first page and 0 for the default limit.
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::ListPrevConversations {
cursor: Some(String::new()),
limit: Some(0),
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
for args in [
MemoryToolArgs::GetResource {
_id: text_id,
conversation: conversation_id,
},
MemoryToolArgs::GetConversationDelta {
_id: conversation_id,
messages_offset: 0,
artifacts_offset: 0,
},
MemoryToolArgs::StopConversation {
_id: conversation_id,
},
MemoryToolArgs::SteerConversation {
_id: conversation_id,
message: "denied steering".to_string(),
},
MemoryToolArgs::FollowUpConversation {
_id: conversation_id,
message: "denied follow-up".to_string(),
},
MemoryToolArgs::DeleteConversation {
_id: conversation_id,
},
] {
let denied = memory_tool
.call(test_ctx(principal(6)), args, Vec::new())
.await
.unwrap_err();
assert!(denied.to_string().contains("permission denied"));
}
memory_tool
.call(
ctx.clone(),
MemoryToolArgs::SteerConversation {
_id: conversation_id,
message: "new steering".to_string(),
},
Vec::new(),
)
.await
.unwrap();
memory_tool
.call(
ctx.clone(),
MemoryToolArgs::FollowUpConversation {
_id: conversation_id,
message: "new follow-up".to_string(),
},
Vec::new(),
)
.await
.unwrap();
let updated = memory.get_conversation(conversation_id).await.unwrap();
assert_eq!(updated.steering_messages.unwrap().len(), 2);
assert_eq!(updated.follow_up_messages.unwrap().len(), 2);
assert!(
memory_tool
.call(
ctx.clone(),
MemoryToolArgs::SteerConversation {
_id: conversation_id,
message: " ".to_string(),
},
Vec::new(),
)
.await
.unwrap_err()
.to_string()
.contains("steering message cannot be empty")
);
assert!(
memory_tool
.call(
ctx.clone(),
MemoryToolArgs::FollowUpConversation {
_id: conversation_id,
message: String::new(),
},
Vec::new(),
)
.await
.unwrap_err()
.to_string()
.contains("follow-up message cannot be empty")
);
memory_tool
.call(
ctx.clone(),
MemoryToolArgs::StopConversation {
_id: conversation_id,
},
Vec::new(),
)
.await
.unwrap();
assert_eq!(
memory
.get_conversation(conversation_id)
.await
.unwrap()
.status,
ConversationStatus::Cancelled
);
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::StopConversation {
_id: conversation_id,
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::ListPrevConversations {
cursor: None,
limit: Some(10),
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let output = memory_tool
.call(
ctx.clone(),
MemoryToolArgs::SearchConversations {
query: "delta".to_string(),
limit: Some(10),
},
Vec::new(),
)
.await
.unwrap();
assert!(matches!(output.output, Response::Ok { .. }));
let denied = memory_tool
.call(
test_ctx(principal(6)),
MemoryToolArgs::GetConversation {
_id: conversation_id,
},
Vec::new(),
)
.await
.unwrap_err();
assert!(denied.to_string().contains("permission denied"));
let output = memory_tool
.call(
ctx,
MemoryToolArgs::DeleteConversation {
_id: conversation_id,
},
Vec::new(),
)
.await
.unwrap();
match output.output {
Response::Ok { result, .. } => assert_eq!(result["deleted"], json!(true)),
other => panic!("unexpected response: {other:?}"),
}
}
}