use anda_cognitive_nexus::{CognitiveNexus, ConceptPK};
use anda_core::{
BoxError, ContentPart, Document, Documents, FunctionDefinition, Message, Resource, ResourceRef,
StateFeatures, Tool, ToolOutput, Usage, Xid, gen_schema_for,
};
use anda_db::{
collection::{Collection, CollectionConfig},
database::AndaDB,
error::DBError,
index::BTree,
query::{Filter, Query, RangeQuery, Search},
};
use anda_db_schema::{AndaDBSchema, FieldEntry, FieldType, Ft, Fv, Json, Schema, SchemaError};
use anda_db_tfs::jieba_tokenizer;
use anda_kip::{
DescribeTarget, KipError, META_SYSTEM_NAME, MetaCommand, PERSON_TYPE, Request, Response,
};
use candid::Principal;
use ciborium::cbor;
use ic_auth_types::ByteBufB64;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Map, json};
use std::{
collections::BTreeMap,
fmt,
sync::{Arc, LazyLock},
};
use crate::{context::BaseCtx, extension::fetch::FetchWebResourcesTool, rfc3339_datetime, unix_ms};
pub static FUNCTION_DEFINITION: LazyLock<FunctionDefinition> = LazyLock::new(|| {
serde_json::from_value(json!({
"name": "execute_kip",
"description": "Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to interact with your persistent memory.",
"parameters": {
"type": "object",
"properties": {
"commands": {
"type": "array",
"description": "An array of KIP commands for batch execution (reduces round-trips). Commands are executed sequentially; execution stops on first KML error.",
"items": {
"type": "string"
}
},
"parameters": {
"type": "object",
"description": "An optional JSON object of key-value pairs used for safe substitution of placeholders in the command string(s). Placeholders should start with ':' (e.g., :name, :limit). IMPORTANT: A placeholder must represent a complete JSON value token (e.g., name: :name). Do not embed placeholders inside quoted strings (e.g., \"Hello :name\"), because substitution uses JSON serialization."
},
},
"required": ["commands"]
}
})).unwrap()
});
#[derive(Debug, Clone, Deserialize, Serialize, AndaDBSchema)]
pub struct Conversation {
pub _id: u64,
#[field_type = "Bytes"]
pub user: Principal,
#[field_type = "Option<Bytes>"]
#[serde(skip_serializing_if = "Option::is_none")]
pub thread: Option<Xid>,
pub messages: Vec<Json>,
pub resources: Vec<Resource>,
pub artifacts: Vec<Resource>,
#[field_type = "Text"]
pub status: ConversationStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed_reason: Option<String>,
#[field_type = "Map<String, U64>"]
pub usage: Usage,
#[serde(skip_serializing_if = "Option::is_none")]
pub steering_messages: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub follow_up_messages: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ancestors: Option<Vec<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<Json>,
pub period: u64,
pub created_at: u64,
pub updated_at: u64,
}
impl Default for Conversation {
fn default() -> Self {
Self {
_id: 0,
user: Principal::anonymous(),
thread: None,
messages: Vec::new(),
resources: Vec::new(),
artifacts: Vec::new(),
status: ConversationStatus::default(),
failed_reason: None,
usage: Usage::default(),
steering_messages: None,
follow_up_messages: None,
ancestors: None,
label: None,
extra: None,
period: 0,
created_at: 0,
updated_at: 0,
}
}
}
impl Conversation {
pub fn append_messages(&mut self, message: Vec<Message>) {
self.messages.extend(message.into_iter().map(|v| json!(v)));
}
pub fn to_changes(&self) -> Result<BTreeMap<String, Fv>, BoxError> {
let mut changes = BTreeMap::from([
(
"messages".to_string(),
Fv::array_from(cbor!(self.messages).unwrap(), &[Ft::Json])?,
),
(
"artifacts".to_string(),
Fv::array_from(cbor!(self.artifacts).unwrap(), &[Resource::field_type()])?,
),
("status".to_string(), Fv::Text(self.status.to_string())),
(
"usage".to_string(),
Fv::map_from(
cbor!(self.usage).unwrap(),
&BTreeMap::from([("*".into(), Ft::U64)]),
)?,
),
("updated_at".to_string(), Fv::U64(self.updated_at)),
(
"steering_messages".to_string(),
if let Some(msg) = self.steering_messages.clone() {
msg.into()
} else {
Fv::Null
},
),
(
"follow_up_messages".to_string(),
if let Some(msg) = self.follow_up_messages.clone() {
msg.into()
} else {
Fv::Null
},
),
(
"label".to_string(),
if let Some(label) = self.label.clone() {
label.into()
} else {
Fv::Null
},
),
(
"extra".to_string(),
if let Some(extra) = self.extra.clone() {
extra.into()
} else {
Fv::Null
},
),
]);
if let Some(reason) = &self.failed_reason {
changes.insert("failed_reason".to_string(), Fv::Text(reason.clone()));
}
Ok(changes)
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct PrunedMessage {
pub role: String,
pub content: Vec<ContentPart>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
}
impl PrunedMessage {
pub fn try_from(mut msg: Message) -> Option<Self> {
msg.prune_content();
Some(Self {
role: msg.role,
content: msg.content,
name: msg.name,
user: msg.user.map(|u| u.to_string()),
timestamp: msg.timestamp.and_then(rfc3339_datetime),
})
}
}
impl From<Conversation> for Document {
fn from(conversation: Conversation) -> Self {
let mut metadata = BTreeMap::from([
("_id".to_string(), conversation._id.into()),
("type".to_string(), "Conversation".into()),
("user".to_string(), conversation.user.to_string().into()),
("status".to_string(), conversation.status.to_string().into()),
(
"created_at".to_string(),
rfc3339_datetime(conversation.created_at).unwrap().into(),
),
(
"updated_at".to_string(),
rfc3339_datetime(conversation.updated_at).unwrap().into(),
),
]);
if let Some(thread) = conversation.thread {
metadata.insert("thread".to_string(), thread.to_string().into());
}
if let Some(label) = conversation.label {
metadata.insert("label".to_string(), label.into());
}
let message: Vec<PrunedMessage> = conversation
.messages
.iter()
.filter_map(|v| {
serde_json::from_value::<Message>(v.clone())
.ok()
.and_then(PrunedMessage::try_from)
})
.collect();
Self {
content: serde_json::to_value(message).unwrap_or_default(),
metadata,
}
}
}
#[derive(Debug, Serialize)]
pub struct ConversationRef<'a> {
pub _id: u64,
pub user: &'a Principal,
pub thread: Option<&'a Xid>,
pub messages: &'a [Json],
pub resources: &'a [Resource],
pub artifacts: &'a [Resource],
pub status: &'a ConversationStatus,
pub usage: &'a Usage,
#[serde(skip_serializing_if = "Option::is_none")]
pub steering_messages: &'a Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub follow_up_messages: &'a Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub label: &'a Option<String>,
pub extra: &'a Option<Json>,
pub period: u64,
pub created_at: u64,
pub updated_at: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub ancestors: &'a Option<Vec<u64>>,
}
impl<'a> From<&'a Conversation> for ConversationRef<'a> {
fn from(conversation: &'a Conversation) -> Self {
Self {
_id: conversation._id,
user: &conversation.user,
thread: conversation.thread.as_ref(),
messages: &conversation.messages,
resources: &conversation.resources,
artifacts: &conversation.artifacts,
status: &conversation.status,
usage: &conversation.usage,
steering_messages: &conversation.steering_messages,
follow_up_messages: &conversation.follow_up_messages,
label: &conversation.label,
extra: &conversation.extra,
period: conversation.period,
created_at: conversation.created_at,
updated_at: conversation.updated_at,
ancestors: &conversation.ancestors,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ConversationState {
pub _id: u64,
pub status: ConversationStatus,
}
impl From<&ConversationRef<'_>> for ConversationState {
fn from(conversation: &ConversationRef<'_>) -> Self {
Self {
_id: conversation._id,
status: conversation.status.clone(),
}
}
}
impl From<&Conversation> for ConversationState {
fn from(conversation: &Conversation) -> Self {
Self {
_id: conversation._id,
status: conversation.status.clone(),
}
}
}
#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ConversationStatus {
#[default]
Submitted,
Working,
Completed,
Cancelled,
Failed,
}
impl fmt::Display for ConversationStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ConversationStatus::Submitted => write!(f, "submitted"),
ConversationStatus::Working => write!(f, "working"),
ConversationStatus::Completed => write!(f, "completed"),
ConversationStatus::Cancelled => write!(f, "cancelled"),
ConversationStatus::Failed => write!(f, "failed"),
}
}
}
#[derive(Debug, Clone)]
pub struct Conversations {
pub conversations: Arc<Collection>,
}
impl Conversations {
pub async fn connect(db: Arc<AndaDB>, name: String) -> Result<Self, BoxError> {
let mut schema = Conversation::schema()?;
schema.with_version(3);
let conversations = db
.open_or_create_collection(
schema,
CollectionConfig {
name,
description: "conversations collection".to_string(),
},
async |collection| {
collection.set_tokenizer(jieba_tokenizer());
collection.create_btree_index_nx(&["user"]).await?;
collection.create_btree_index_nx(&["thread"]).await?;
collection.create_btree_index_nx(&["period"]).await?;
collection
.create_bm25_index_nx(&["messages", "resources", "artifacts"])
.await?;
Ok::<(), DBError>(())
},
)
.await?;
Ok(Self { conversations })
}
pub async fn add_conversation(
&self,
conversation: ConversationRef<'_>,
) -> Result<u64, DBError> {
let id = self.conversations.add_from(&conversation).await?;
self.conversations.flush(unix_ms()).await?;
Ok(id)
}
pub async fn update_conversation(
&self,
id: u64,
fields: BTreeMap<String, Fv>,
) -> Result<(), DBError> {
self.conversations.update(id, fields).await?;
self.conversations.flush(unix_ms()).await?;
Ok(())
}
pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
self.conversations.get_as(id).await
}
pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
let doc = self.conversations.remove(id).await?;
self.conversations.flush(unix_ms()).await?;
Ok(doc.is_some())
}
pub async fn batch_get_conversations(
&self,
user: &Principal,
ids: Vec<u64>,
) -> Result<Vec<Conversation>, BoxError> {
let filter = Some(Filter::And(vec![
Box::new(Filter::Field((
"_id".to_string(),
RangeQuery::Include(ids.into_iter().map(Fv::U64).collect()),
))),
Box::new(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
]));
let rt: Vec<Conversation> = self
.conversations
.search_as(Query {
search: None,
filter,
limit: None,
})
.await?;
Ok(rt)
}
pub async fn list_conversations_by_user(
&self,
user: &Principal,
cursor: Option<String>,
limit: Option<usize>,
) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
let limit = limit.unwrap_or(10).min(100);
let cursor = match BTree::from_cursor::<u64>(&cursor)? {
Some(cursor) => cursor,
None => self.conversations.max_document_id() + 1,
};
let filter = Some(Filter::And(vec![
Box::new(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
Box::new(Filter::Field((
"_id".to_string(),
RangeQuery::Lt(Fv::U64(cursor)),
))),
]));
let rt: Vec<Conversation> = self
.conversations
.search_as(Query {
search: None,
filter,
limit: Some(limit),
})
.await?;
let cursor = if rt.len() >= limit {
BTree::to_cursor(&rt.first().unwrap()._id)
} else {
None
};
Ok((rt, cursor))
}
pub async fn search_conversations(
&self,
user: &Principal,
query: String,
limit: Option<usize>,
) -> Result<Vec<Conversation>, BoxError> {
let rt = self
.conversations
.search_as(Query {
search: Some(Search {
text: Some(query.to_string()),
logical_search: true,
..Default::default()
}),
filter: Some(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
limit,
})
.await?;
Ok(rt)
}
pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
let period = timestamp / 3600 / 1000;
let filter = Filter::Field(("period".to_string(), RangeQuery::Lt(Fv::U64(period))));
let ids = self
.conversations
.search_ids(Query {
search: None,
filter: Some(filter),
limit: None,
})
.await?;
let count = ids.len() as u64;
for id in ids {
let _ = self.conversations.remove(id).await;
}
let now_ms = unix_ms();
self.conversations.flush(now_ms).await?;
Ok(count)
}
}
#[derive(Debug, Clone)]
pub struct MemoryManagement {
pub nexus: Arc<CognitiveNexus>,
pub conversations: Arc<Collection>,
pub resources: Arc<Collection>,
pub kip_function_definitions: FunctionDefinition,
}
impl MemoryManagement {
pub async fn connect(db: Arc<AndaDB>, nexus: Arc<CognitiveNexus>) -> Result<Self, BoxError> {
let mut schema = Conversation::schema()?;
schema.with_version(3);
let conversations = db
.open_or_create_collection(
schema,
CollectionConfig {
name: "conversations".to_string(),
description: "conversations collection".to_string(),
},
async |collection| {
collection.set_tokenizer(jieba_tokenizer());
collection.create_btree_index_nx(&["user"]).await?;
collection.create_btree_index_nx(&["thread"]).await?;
collection.create_btree_index_nx(&["period"]).await?;
collection
.create_bm25_index_nx(&["messages", "resources", "artifacts"])
.await?;
Ok::<(), DBError>(())
},
)
.await?;
let schema = Resource::schema()?;
let resources = db
.open_or_create_collection(
schema,
CollectionConfig {
name: "resources".to_string(),
description: "Resources collection".to_string(),
},
async |collection| {
collection.set_tokenizer(jieba_tokenizer());
collection.create_btree_index_nx(&["tags"]).await?;
collection.create_btree_index_nx(&["hash"]).await?;
collection.create_btree_index_nx(&["mime_type"]).await?;
collection
.create_bm25_index_nx(&["name", "description", "metadata"])
.await?;
Ok::<(), DBError>(())
},
)
.await?;
Ok(Self {
nexus,
conversations,
resources,
kip_function_definitions: FUNCTION_DEFINITION.clone(),
})
}
pub fn with_kip_function_definitions(mut self, def: FunctionDefinition) -> Self {
self.kip_function_definitions = def;
self
}
pub fn nexus(&self) -> Arc<CognitiveNexus> {
self.nexus.clone()
}
pub fn max_conversation_id(&self) -> u64 {
self.conversations.max_document_id()
}
pub async fn describe_primer(&self) -> Result<Json, KipError> {
let (primer, _) = self
.nexus
.execute_meta(MetaCommand::Describe(DescribeTarget::Primer))
.await?;
Ok(primer)
}
pub async fn describe_system(&self) -> Result<Json, KipError> {
let system = self
.nexus
.get_concept(&ConceptPK::Object {
r#type: PERSON_TYPE.to_string(),
name: META_SYSTEM_NAME.to_string(),
})
.await?;
let (domains, _) = self
.nexus
.execute_meta(MetaCommand::Describe(DescribeTarget::Domains))
.await?;
Ok(json!({
"identity": system.to_concept_node(),
"domains": domains,
}))
}
pub async fn describe_caller(&self, id: &Principal) -> Result<Json, KipError> {
let user = self
.nexus
.get_concept(&ConceptPK::Object {
r#type: PERSON_TYPE.to_string(),
name: id.to_string(),
})
.await?;
Ok(user.to_concept_node())
}
pub async fn get_or_init_caller(
&self,
id: &Principal,
name: Option<String>,
) -> Result<Json, KipError> {
let mut attributes = Map::new();
let mut metadata = Map::new();
attributes.insert("id".to_string(), id.to_string().into());
attributes.insert("person_class".to_string(), "Human".into());
if let Some(name) = name {
attributes.insert("name".to_string(), name.into());
}
metadata.insert("author".to_string(), "$system".into());
metadata.insert("status".to_string(), "active".into());
let user = self
.nexus
.get_or_init_concept(
PERSON_TYPE.to_string(),
id.to_string(),
attributes,
metadata,
)
.await?;
Ok(user.to_concept_node())
}
pub async fn add_resource(&self, resource: ResourceRef<'_>) -> Result<u64, DBError> {
let id = self.resources.add_from(&resource).await?;
self.resources.flush(unix_ms()).await?;
Ok(id)
}
pub async fn try_add_resources(
&self,
resources: &[Resource],
) -> Result<Vec<Resource>, BoxError> {
let mut rs: Vec<Resource> = Vec::with_capacity(resources.len());
let mut count = 0;
for r in resources.iter() {
let rf: ResourceRef = r.into();
let id = if r._id > 0 {
r._id } else {
match self.resources.add_from(&rf).await {
Ok(id) => {
count += 1;
id
}
Err(DBError::AlreadyExists { _id, .. }) => _id,
Err(err) => Err(err)?,
}
};
let r2 = Resource {
_id: id,
blob: None,
..r.clone()
};
rs.push(r2)
}
if count > 0 {
self.resources.flush(unix_ms()).await?;
}
Ok(rs)
}
pub async fn get_resource(&self, id: u64) -> Result<Resource, DBError> {
self.resources.get_as(id).await
}
pub async fn add_conversation(
&self,
conversation: ConversationRef<'_>,
) -> Result<u64, DBError> {
let id = self.conversations.add_from(&conversation).await?;
self.conversations.flush(unix_ms()).await?;
Ok(id)
}
pub async fn update_conversation(
&self,
id: u64,
fields: BTreeMap<String, Fv>,
) -> Result<(), DBError> {
self.conversations.update(id, fields).await?;
self.conversations.flush(unix_ms()).await?;
Ok(())
}
pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
self.conversations.get_as(id).await
}
pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
let doc = self.conversations.remove(id).await?;
self.conversations.flush(unix_ms()).await?;
Ok(doc.is_some())
}
pub async fn list_conversations_by_user(
&self,
user: &Principal,
cursor: Option<String>,
limit: Option<usize>,
) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
let limit = limit.unwrap_or(10).min(100);
let cursor = match BTree::from_cursor::<u64>(&cursor)? {
Some(cursor) => cursor,
None => self.conversations.max_document_id() + 1,
};
let filter = Some(Filter::And(vec![
Box::new(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
Box::new(Filter::Field((
"_id".to_string(),
RangeQuery::Lt(Fv::U64(cursor)),
))),
]));
let rt: Vec<Conversation> = self
.conversations
.search_as(Query {
search: None,
filter,
limit: Some(limit),
})
.await?;
let cursor = if rt.len() >= limit {
BTree::to_cursor(&rt.first().unwrap()._id)
} else {
None
};
Ok((rt, cursor))
}
pub async fn search_conversations(
&self,
user: &Principal,
query: String,
limit: Option<usize>,
) -> Result<Vec<Conversation>, BoxError> {
let rt = self
.conversations
.search_as(Query {
search: Some(Search {
text: Some(query.to_string()),
logical_search: true,
..Default::default()
}),
filter: Some(Filter::Field((
"user".to_string(),
RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
))),
limit,
})
.await?;
Ok(rt)
}
pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
let period = timestamp / 3600 / 1000;
let filter = Filter::Field(("period".to_string(), RangeQuery::Lt(Fv::U64(period))));
let ids = self
.conversations
.search_ids(Query {
search: None,
filter: Some(filter),
limit: None,
})
.await?;
let count = ids.len() as u64;
for id in ids {
if let Ok(Some(doc)) = self.conversations.remove(id).await
&& let Ok(conversation) = doc.try_into::<Conversation>()
{
for resource in conversation.resources {
if resource._id > 0 {
let _ = self.resources.remove(resource._id).await;
}
}
for artifact in conversation.artifacts {
if artifact._id > 0 {
let _ = self.resources.remove(artifact._id).await;
}
}
};
}
let now_ms = unix_ms();
self.conversations.flush(now_ms).await?;
self.resources.flush(now_ms).await?;
Ok(count)
}
}
impl Tool<BaseCtx> for MemoryManagement {
type Args = Request;
type Output = Response;
fn name(&self) -> String {
self.kip_function_definitions.name.clone()
}
fn description(&self) -> String {
self.kip_function_definitions.description.clone()
}
fn definition(&self) -> FunctionDefinition {
self.kip_function_definitions.clone()
}
async fn call(
&self,
_ctx: BaseCtx,
request: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let (_, res) = request.execute(self.nexus.as_ref()).await;
Ok(ToolOutput::new(res))
}
}
#[derive(Debug, Clone)]
pub struct MemoryReadonly {
memory: Arc<MemoryManagement>,
}
impl MemoryReadonly {
pub const NAME: &'static str = "execute_kip_readonly";
pub fn new(memory: Arc<MemoryManagement>) -> Self {
Self { memory }
}
}
impl Tool<BaseCtx> for MemoryReadonly {
type Args = Request;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to read from your persistent memory. This tool does not allow any modifications to the memory and is safe to use for retrieval operations.".to_string()
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.memory.kip_function_definitions.parameters.clone(),
strict: Some(true),
}
}
async fn call(
&self,
_ctx: BaseCtx,
mut request: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let (_, res) = request.readonly().execute(self.memory.nexus.as_ref()).await;
Ok(ToolOutput::new(res))
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct GetResourceContentArgs {
pub _id: u64,
}
#[derive(Debug, Clone)]
pub struct GetResourceContentTool {
memory: Arc<MemoryManagement>,
schema: Json,
}
impl GetResourceContentTool {
pub const NAME: &'static str = "get_resource_content";
pub fn new(memory: Arc<MemoryManagement>) -> Self {
let schema = gen_schema_for::<GetResourceContentArgs>();
Self { memory, schema }
}
}
impl Tool<BaseCtx> for GetResourceContentTool {
type Args = GetResourceContentArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"Retrieves the full content of a stored resource by its ID. Returns the content as plain text if UTF-8 encoded, or as a base64url-encoded string for binary data. If the resource has no local blob but has a URI, it will be fetched from the remote source.".to_string()
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.schema.clone(),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let res = self.memory.get_resource(args._id).await?;
let text = match res.blob {
Some(blob) => match String::from_utf8(blob.0) {
Ok(s) => s,
Err(e) => ByteBufB64(e.into_bytes()).to_string(),
},
None => match res.uri {
Some(uri) => FetchWebResourcesTool::fetch_as_text(&ctx, &uri).await?,
None => Err(format!("Invalid resource {}, no blob or uri", args._id))?,
},
};
Ok(ToolOutput::new(Response::ok(text.into())))
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct ListConversationsArgs {
#[serde(default)]
pub cursor: String,
#[serde(default)]
pub limit: usize,
}
#[derive(Debug, Clone)]
pub struct ListConversationsTool {
conversations: Conversations,
description: String,
}
impl ListConversationsTool {
pub const NAME: &'static str = "list_previous_conversations";
pub fn new(conversations: Conversations) -> Self {
Self { conversations, description: "Lists the current user's previous conversations in reverse chronological order with cursor-based pagination. Returns conversation metadata including status, timestamps, messages, and usage statistics. Use the cursor parameter to paginate through older conversations.".to_string() }
}
pub fn with_description(mut self, description: String) -> Self {
self.description = description;
self
}
}
impl Tool<BaseCtx> for ListConversationsTool {
type Args = ListConversationsArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
self.description.clone()
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description.clone(),
parameters: json!({
"type": "object",
"properties": {
"cursor": {
"type": "string",
"description": "The cursor for pagination, returned from the previous call. Use an empty string for the first page."
},
"limit": {
"type": "integer",
"description": "The maximum number of conversations to return, between 1 and 100. Default is 10.",
"default": 10,
"minimum": 1,
"maximum": 100
}
},
"required": [],
"additionalProperties": false
}),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let (conversations, next_cursor) = self
.conversations
.list_conversations_by_user(
ctx.caller(),
if args.cursor.is_empty() {
None
} else {
Some(args.cursor)
},
if args.limit == 0 {
None
} else {
Some(args.limit)
},
)
.await?;
let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
Ok(ToolOutput::new(Response::Ok {
result: Documents::from(docs).to_string().into(),
next_cursor,
}))
}
}
#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
pub struct SearchConversationsArgs {
pub query: String,
#[serde(default)]
pub limit: usize,
}
#[derive(Debug, Clone)]
pub struct SearchConversationsTool {
conversations: Conversations,
description: String,
}
impl SearchConversationsTool {
pub const NAME: &'static str = "search_conversations";
pub fn new(conversations: Conversations) -> Self {
Self { conversations, description: "Performs a full-text search across the current user's conversation history using a query string. Searches through messages, resources, and artifacts to find relevant past conversations. Use this to recall specific topics, instructions, or context from previous interactions.".to_string() }
}
pub fn with_description(mut self, description: String) -> Self {
self.description = description;
self
}
}
impl Tool<BaseCtx> for SearchConversationsTool {
type Args = SearchConversationsArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
self.description.clone()
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description.clone(),
parameters: json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query string to search for in the conversation history."
},
"limit": {
"type": "integer",
"description": "The maximum number of conversations to return, between 1 and 100. Default is 10.",
"default": 10,
"minimum": 1,
"maximum": 100
}
},
"required": ["query"],
"additionalProperties": false
}),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
let conversations = self
.conversations
.search_conversations(
ctx.caller(),
args.query,
if args.limit == 0 {
None
} else {
Some(args.limit)
},
)
.await?;
let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
Ok(ToolOutput::new(Response::Ok {
result: Documents::from(docs).to_string().into(),
next_cursor: None,
}))
}
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum MemoryToolArgs {
GetResource {
_id: u64,
conversation: u64,
},
GetConversation {
_id: u64,
},
StopConversation {
_id: u64,
},
SteerConversation {
_id: u64,
message: String,
},
FollowUpConversation {
_id: u64,
message: String,
},
DeleteConversation {
_id: u64,
},
ListPrevConversations {
cursor: Option<String>,
limit: Option<usize>,
},
SearchConversations {
query: String,
limit: Option<usize>,
},
}
#[derive(Debug, Clone)]
pub struct MemoryTool {
memory: Arc<MemoryManagement>,
schema: Json,
}
impl MemoryTool {
pub const NAME: &'static str = "memory_api";
pub fn new(memory: Arc<MemoryManagement>) -> Self {
let schema = gen_schema_for::<MemoryToolArgs>();
Self { memory, schema }
}
}
impl Tool<BaseCtx> for MemoryTool {
type Args = MemoryToolArgs;
type Output = Response;
fn name(&self) -> String {
Self::NAME.to_string()
}
fn description(&self) -> String {
"A unified API for managing conversations and memory. Supports retrieving resources and conversation details, stopping or steering in-progress conversations, sending follow-up messages, deleting conversations, listing previous conversations with pagination, searching conversation history by keyword, and listing KIP command logs.".to_string()
}
fn definition(&self) -> FunctionDefinition {
FunctionDefinition {
name: self.name(),
description: self.description(),
parameters: self.schema.clone(),
strict: Some(true),
}
}
async fn call(
&self,
ctx: BaseCtx,
args: Self::Args,
_resources: Vec<Resource>,
) -> Result<ToolOutput<Self::Output>, BoxError> {
match args {
MemoryToolArgs::GetResource { _id, conversation } => {
let conversation = self.memory.get_conversation(conversation).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let mut res = self.memory.get_resource(_id).await?;
if res.blob.is_none()
&& let Some(uri) = &res.uri
{
res.blob = FetchWebResourcesTool::fetch_as_bytes(&ctx, uri).await.ok();
}
Ok(ToolOutput::new(Response::Ok {
result: json!(res),
next_cursor: None,
}))
}
MemoryToolArgs::GetConversation { _id } => {
let conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::StopConversation { _id } => {
let mut conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
if conversation.status == ConversationStatus::Working
|| conversation.status == ConversationStatus::Submitted
{
conversation.status = ConversationStatus::Cancelled;
conversation.updated_at = unix_ms();
let changes = BTreeMap::from([
(
"status".to_string(),
Fv::Text(conversation.status.to_string()),
),
("updated_at".to_string(), Fv::U64(conversation.updated_at)),
]);
self.memory.update_conversation(_id, changes).await?;
}
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::SteerConversation { _id, message } => {
if message.trim().is_empty() {
return Err("steering message cannot be empty".into());
}
let mut conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let steering_messages = if let Some(msg) = conversation.steering_messages.clone() {
let mut msgs = msg;
msgs.push(message.clone());
msgs
} else {
vec![message.clone()]
};
conversation.steering_messages = Some(steering_messages.clone());
conversation.updated_at = unix_ms();
let changes = BTreeMap::from([
("steering_messages".to_string(), steering_messages.into()),
("updated_at".to_string(), Fv::U64(conversation.updated_at)),
]);
self.memory.update_conversation(_id, changes).await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::FollowUpConversation { _id, message } => {
if message.trim().is_empty() {
return Err("follow-up message cannot be empty".into());
}
let mut conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let follow_up_messages = if let Some(msg) = conversation.follow_up_messages.clone()
{
let mut msgs = msg;
msgs.push(message.clone());
msgs
} else {
vec![message.clone()]
};
conversation.follow_up_messages = Some(follow_up_messages.clone());
conversation.updated_at = unix_ms();
let changes = BTreeMap::from([
("follow_up_messages".to_string(), follow_up_messages.into()),
("updated_at".to_string(), Fv::U64(conversation.updated_at)),
]);
self.memory.update_conversation(_id, changes).await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversation),
next_cursor: None,
}))
}
MemoryToolArgs::DeleteConversation { _id } => {
let conversation = self.memory.get_conversation(_id).await?;
if &conversation.user != ctx.caller() {
return Err("permission denied".into());
}
let deleted = self.memory.delete_conversation(_id).await?;
Ok(ToolOutput::new(Response::Ok {
result: json!({ "deleted": deleted }),
next_cursor: None,
}))
}
MemoryToolArgs::ListPrevConversations { cursor, limit } => {
let (conversations, next_cursor) = self
.memory
.list_conversations_by_user(ctx.caller(), cursor, limit)
.await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversations),
next_cursor,
}))
}
MemoryToolArgs::SearchConversations { query, limit } => {
let conversations = self
.memory
.search_conversations(ctx.caller(), query, limit)
.await?;
Ok(ToolOutput::new(Response::Ok {
result: json!(conversations),
next_cursor: None,
}))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_conversation_status() {
let chat = Conversation {
status: ConversationStatus::Completed,
..Default::default()
};
let rt = ConversationStatus::Completed;
println!("{}", rt);
let rt = serde_json::to_string(&chat).unwrap();
assert!(rt.contains(r#","status":"completed","#));
let chat2: Conversation = serde_json::from_str(&rt).unwrap();
assert_eq!(chat.status, chat2.status);
let args = MemoryToolArgs::GetConversation { _id: 1 };
let rt = serde_json::to_string(&args).unwrap();
assert_eq!(rt, r#"{"type":"GetConversation","_id":1}"#);
let args1: MemoryToolArgs = serde_json::from_str(&rt).unwrap();
assert_eq!(args, args1);
}
}