Skip to main content

anda_engine/
memory.rs

1//! Persistent memory tools and conversation storage.
2//!
3//! This module stores conversations, resources, artifacts, and memory-management
4//! commands for agents. Conversation metadata is stored in AndaDB collections
5//! with BTree and BM25 indexes, while higher-level memory operations are exposed
6//! through KIP (Knowledge Interaction Protocol) tools backed by the Cognitive
7//! Nexus.
8
9use anda_cognitive_nexus::{CognitiveNexus, ConceptPK};
10use anda_core::{
11    BoxError, ContentPart, Document, Documents, FunctionDefinition, Message, Resource, ResourceRef,
12    StateFeatures, Tool, ToolOutput, Usage, Xid, gen_schema_for,
13};
14use anda_db::{
15    collection::{Collection, CollectionConfig},
16    database::AndaDB,
17    error::DBError,
18    index::BTree,
19    query::{Filter, Query, RangeQuery, Search},
20};
21use anda_db_schema::{
22    AndaDBSchema, FieldEntry, FieldKey, FieldType, Ft, Fv, Json, Schema, SchemaError,
23};
24use anda_db_tfs::jieba_tokenizer;
25use anda_kip::{
26    DescribeTarget, KipError, META_SYSTEM_NAME, MetaCommand, PERSON_TYPE, Request, Response,
27};
28use candid::Principal;
29use ciborium::cbor;
30use ic_auth_types::ByteBufB64;
31use schemars::JsonSchema;
32use serde::{Deserialize, Serialize};
33use serde_json::{Map, json};
34use std::{
35    collections::{BTreeMap, HashMap},
36    fmt,
37    sync::{Arc, LazyLock},
38};
39
40use crate::{context::BaseCtx, extension::fetch::FetchWebResourcesTool, rfc3339_datetime, unix_ms};
41
42pub static FUNCTION_DEFINITION: LazyLock<FunctionDefinition> = LazyLock::new(|| {
43    serde_json::from_value(json!({
44        "name": "execute_kip",
45        "description": "Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to interact with your persistent memory.",
46        "parameters": {
47            "type": "object",
48            "properties": {
49                "commands": {
50                    "type": "array",
51                    "description": "An array of KIP commands for batch execution (reduces round-trips). Commands are executed sequentially; execution stops on first KML error.",
52                    "items": {
53                        "type": "string"
54                    }
55                },
56                "parameters": {
57                    "type": "object",
58                    "description": "An optional JSON object of key-value pairs used for safe substitution of placeholders in the command string(s). Placeholders should start with ':' (e.g., :name, :limit). IMPORTANT: A placeholder must represent a complete JSON value token (e.g., name: :name). Do not embed placeholders inside quoted strings (e.g., \"Hello :name\"), because substitution uses JSON serialization."
59                },
60            },
61            "required": ["commands", "parameters"],
62            "additionalProperties": false
63        }
64    })).unwrap()
65});
66
67/// Conversation record stored in the memory database.
68///
69/// Schema version: 4
70#[derive(Debug, Clone, Deserialize, Serialize, AndaDBSchema)]
71pub struct Conversation {
72    /// Unique collection identifier assigned by AndaDB.
73    pub _id: u64,
74
75    /// Principal that owns the conversation.
76    #[field_type = "Bytes"]
77    pub user: Principal,
78
79    /// Optional thread identifier used to group related conversation turns.
80    #[field_type = "Option<Bytes>"]
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub thread: Option<Xid>,
83
84    /// Serialized chat messages accumulated for the conversation.
85    pub messages: Vec<Json>,
86
87    /// The request resources used by the agent to process the conversation.
88    pub resources: Vec<Resource>,
89
90    /// A collection of artifacts generated by the agent during the execution of the task.
91    pub artifacts: Vec<Resource>,
92
93    /// Current lifecycle state of the conversation.
94    #[field_type = "Text"]
95    pub status: ConversationStatus,
96
97    /// Failure reason recorded when the conversation did not complete.
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub failed_reason: Option<String>,
100
101    /// The LLM usage statistics for the conversation.
102    #[field_type = "Map<String, U64>"]
103    pub usage: Usage,
104
105    /// Messages queued to interrupt the agent mid-run.
106    ///
107    /// They are delivered after the current tool execution and skip remaining
108    /// pending tools.
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub steering_messages: Option<Vec<String>>,
111
112    /// Follow-up messages processed after the agent becomes idle.
113    ///
114    /// They are delivered only when there are no pending tool calls or steering
115    /// messages.
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub follow_up_messages: Option<Vec<String>>,
118
119    /// The child conversation ID, if this conversation has been continued. Should not be updated after set.
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub child: Option<u64>,
122
123    /// The ancestor conversation IDs, ordered from root to parent.
124    /// Should not be updated after creation.
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub ancestors: Option<Vec<u64>>,
127
128    /// An optional label for the conversation, which can be used for categorization or retrieval.
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub label: Option<String>,
131
132    /// Extra information for future extensions.
133    ///
134    /// This field is not indexed and should not be used for filtering or
135    /// searching.
136    #[serde(skip_serializing_if = "Option::is_none")]
137    pub extra: Option<Json>,
138
139    /// The period when the conversation was created, in hours (timestamp / 3600 / 1000).
140    /// It is used to index the conversation for faster retrieval by time.
141    pub period: u64,
142
143    /// The timestamp when the conversation was created, in milliseconds.
144    pub created_at: u64,
145
146    /// The timestamp when the conversation was updated, in milliseconds.
147    pub updated_at: u64,
148}
149
150impl Default for Conversation {
151    fn default() -> Self {
152        Self {
153            _id: 0,
154            user: Principal::anonymous(),
155            thread: None,
156            messages: Vec::new(),
157            resources: Vec::new(),
158            artifacts: Vec::new(),
159            status: ConversationStatus::default(),
160            failed_reason: None,
161            usage: Usage::default(),
162            steering_messages: None,
163            follow_up_messages: None,
164            child: None,
165            ancestors: None,
166            label: None,
167            extra: None,
168            period: 0,
169            created_at: 0,
170            updated_at: 0,
171        }
172    }
173}
174
175impl Conversation {
176    pub fn append_messages(&mut self, message: Vec<Message>) {
177        self.messages.extend(message.into_iter().map(|v| json!(v)));
178    }
179
180    pub fn to_changes(&self) -> Result<BTreeMap<String, Fv>, BoxError> {
181        let mut changes = BTreeMap::from([
182            (
183                "messages".to_string(),
184                Fv::array_from(cbor!(self.messages).unwrap(), &[Ft::Json])?,
185            ),
186            (
187                "resources".to_string(),
188                Fv::array_from(cbor!(self.resources).unwrap(), &[Resource::field_type()])?,
189            ),
190            (
191                "artifacts".to_string(),
192                Fv::array_from(cbor!(self.artifacts).unwrap(), &[Resource::field_type()])?,
193            ),
194            ("status".to_string(), Fv::Text(self.status.to_string())),
195            (
196                "usage".to_string(),
197                Fv::map_from(
198                    cbor!(self.usage).unwrap(),
199                    &BTreeMap::from([("*".into(), Ft::U64)]),
200                )?,
201            ),
202            ("updated_at".to_string(), Fv::U64(self.updated_at)),
203            (
204                "steering_messages".to_string(),
205                if let Some(msg) = self.steering_messages.clone() {
206                    msg.into()
207                } else {
208                    Fv::Null
209                },
210            ),
211            (
212                "follow_up_messages".to_string(),
213                if let Some(msg) = self.follow_up_messages.clone() {
214                    msg.into()
215                } else {
216                    Fv::Null
217                },
218            ),
219            (
220                "label".to_string(),
221                if let Some(label) = self.label.clone() {
222                    label.into()
223                } else {
224                    Fv::Null
225                },
226            ),
227            (
228                "extra".to_string(),
229                if let Some(extra) = self.extra.clone() {
230                    extra.into()
231                } else {
232                    Fv::Null
233                },
234            ),
235        ]);
236
237        if let Some(child) = self.child {
238            changes.insert("child".to_string(), Fv::U64(child));
239        }
240        if let Some(reason) = &self.failed_reason {
241            changes.insert("failed_reason".to_string(), Fv::Text(reason.clone()));
242        }
243        Ok(changes)
244    }
245
246    pub fn to_delta(&self, messages_offset: usize, artifacts_offset: usize) -> ConversationDelta {
247        ConversationDelta {
248            _id: self._id,
249            messages: self
250                .messages
251                .iter()
252                .skip(messages_offset)
253                .cloned()
254                .collect(),
255            artifacts: self
256                .artifacts
257                .iter()
258                .skip(artifacts_offset)
259                .cloned()
260                .collect(),
261            status: self.status.clone(),
262            usage: self.usage.clone(),
263            failed_reason: self.failed_reason.clone(),
264            updated_at: self.updated_at,
265            child: self.child,
266        }
267    }
268
269    pub fn into_delta(self, messages_offset: usize, artifacts_offset: usize) -> ConversationDelta {
270        ConversationDelta {
271            _id: self._id,
272            messages: self.messages.into_iter().skip(messages_offset).collect(),
273            artifacts: self.artifacts.into_iter().skip(artifacts_offset).collect(),
274            status: self.status,
275            usage: self.usage,
276            failed_reason: self.failed_reason,
277            updated_at: self.updated_at,
278            child: self.child,
279        }
280    }
281}
282
283#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq)]
284pub struct PrunedMessage {
285    pub role: String,
286
287    pub content: Vec<ContentPart>,
288
289    #[serde(skip_serializing_if = "Option::is_none")]
290    pub name: Option<String>,
291
292    #[serde(skip_serializing_if = "Option::is_none")]
293    pub user: Option<String>,
294
295    #[serde(skip_serializing_if = "Option::is_none")]
296    pub timestamp: Option<String>,
297}
298
299impl PrunedMessage {
300    pub fn try_from(mut msg: Message) -> Option<Self> {
301        msg.prune_content();
302        Some(Self {
303            role: msg.role,
304            content: msg.content,
305            name: msg.name,
306            user: msg.user.map(|u| u.to_string()),
307            timestamp: msg.timestamp.and_then(rfc3339_datetime),
308        })
309    }
310}
311
312impl From<Conversation> for Document {
313    fn from(conversation: Conversation) -> Self {
314        let mut metadata = BTreeMap::from([
315            ("_id".to_string(), conversation._id.into()),
316            ("type".to_string(), "Conversation".into()),
317            ("user".to_string(), conversation.user.to_string().into()),
318            ("status".to_string(), conversation.status.to_string().into()),
319            (
320                "created_at".to_string(),
321                rfc3339_datetime(conversation.created_at).unwrap().into(),
322            ),
323            (
324                "updated_at".to_string(),
325                rfc3339_datetime(conversation.updated_at).unwrap().into(),
326            ),
327        ]);
328        if let Some(thread) = conversation.thread {
329            metadata.insert("thread".to_string(), thread.to_string().into());
330        }
331        if let Some(label) = conversation.label {
332            metadata.insert("label".to_string(), label.into());
333        }
334        let message: Vec<PrunedMessage> = conversation
335            .messages
336            .iter()
337            .filter_map(|v| {
338                serde_json::from_value::<Message>(v.clone())
339                    .ok()
340                    .and_then(PrunedMessage::try_from)
341            })
342            .collect();
343        Self {
344            content: serde_json::to_value(message).unwrap_or_default(),
345            metadata,
346        }
347    }
348}
349
350#[derive(Debug, Serialize)]
351pub struct ConversationRef<'a> {
352    pub _id: u64,
353    pub user: &'a Principal,
354    pub thread: Option<&'a Xid>,
355    pub messages: &'a [Json],
356    pub resources: &'a [Resource],
357    pub artifacts: &'a [Resource],
358    pub status: &'a ConversationStatus,
359    pub usage: &'a Usage,
360    #[serde(skip_serializing_if = "Option::is_none")]
361    pub steering_messages: &'a Option<Vec<String>>,
362    #[serde(skip_serializing_if = "Option::is_none")]
363    pub follow_up_messages: &'a Option<Vec<String>>,
364    #[serde(skip_serializing_if = "Option::is_none")]
365    pub label: &'a Option<String>,
366    pub extra: &'a Option<Json>,
367    pub period: u64,
368    pub created_at: u64,
369    pub updated_at: u64,
370    #[serde(skip_serializing_if = "Option::is_none")]
371    pub child: &'a Option<u64>,
372    #[serde(skip_serializing_if = "Option::is_none")]
373    pub ancestors: &'a Option<Vec<u64>>,
374}
375
376impl<'a> From<&'a Conversation> for ConversationRef<'a> {
377    fn from(conversation: &'a Conversation) -> Self {
378        Self {
379            _id: conversation._id,
380            user: &conversation.user,
381            thread: conversation.thread.as_ref(),
382            messages: &conversation.messages,
383            resources: &conversation.resources,
384            artifacts: &conversation.artifacts,
385            status: &conversation.status,
386            usage: &conversation.usage,
387            steering_messages: &conversation.steering_messages,
388            follow_up_messages: &conversation.follow_up_messages,
389            label: &conversation.label,
390            extra: &conversation.extra,
391            period: conversation.period,
392            created_at: conversation.created_at,
393            updated_at: conversation.updated_at,
394            child: &conversation.child,
395            ancestors: &conversation.ancestors,
396        }
397    }
398}
399
400#[derive(Debug, Clone, Deserialize, Serialize)]
401pub struct ConversationState {
402    pub _id: u64,
403    pub status: ConversationStatus,
404}
405
406impl From<&ConversationRef<'_>> for ConversationState {
407    fn from(conversation: &ConversationRef<'_>) -> Self {
408        Self {
409            _id: conversation._id,
410            status: conversation.status.clone(),
411        }
412    }
413}
414
415impl From<&Conversation> for ConversationState {
416    fn from(conversation: &Conversation) -> Self {
417        Self {
418            _id: conversation._id,
419            status: conversation.status.clone(),
420        }
421    }
422}
423
424/// A delta of a conversation since a given offset, used for incremental fetching of conversation messages.
425#[derive(Debug, Clone, Deserialize, Serialize)]
426pub struct ConversationDelta {
427    pub _id: u64,
428    /// The new messages since the given offset. The offset is determined by the client and is not stored in the database. It is used to support incremental fetching of conversation messages.
429    pub messages: Vec<Json>,
430    pub artifacts: Vec<Resource>,
431    pub status: ConversationStatus,
432    pub usage: Usage,
433    pub failed_reason: Option<String>,
434    pub updated_at: u64,
435    pub child: Option<u64>,
436}
437
438#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
439#[serde(rename_all = "lowercase")]
440pub enum ConversationStatus {
441    #[default]
442    Submitted,
443    Working,
444    Idle,
445    Completed,
446    Cancelled,
447    Failed,
448}
449
450impl fmt::Display for ConversationStatus {
451    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452        match self {
453            ConversationStatus::Submitted => write!(f, "submitted"),
454            ConversationStatus::Working => write!(f, "working"),
455            ConversationStatus::Idle => write!(f, "idle"),
456            ConversationStatus::Completed => write!(f, "completed"),
457            ConversationStatus::Cancelled => write!(f, "cancelled"),
458            ConversationStatus::Failed => write!(f, "failed"),
459        }
460    }
461}
462
463#[derive(Debug, Clone)]
464pub struct Conversations {
465    pub conversations: Arc<Collection>,
466}
467
468impl Conversations {
469    pub async fn connect(db: Arc<AndaDB>, name: String) -> Result<Self, BoxError> {
470        let mut schema = Conversation::schema()?;
471        schema.with_version(4);
472
473        let conversations = db
474            .open_or_create_collection(
475                schema,
476                CollectionConfig {
477                    name,
478                    description: "conversations collection".to_string(),
479                },
480                async |collection| {
481                    // set tokenizer
482                    collection.set_tokenizer(jieba_tokenizer());
483                    // create BTree indexes if not exists
484                    collection.create_btree_index_nx(&["user"]).await?;
485                    collection.create_btree_index_nx(&["thread"]).await?;
486                    collection.create_btree_index_nx(&["period"]).await?;
487                    collection
488                        .create_bm25_index_nx(&["messages", "resources", "artifacts"])
489                        .await?;
490
491                    Ok::<(), DBError>(())
492                },
493            )
494            .await?;
495
496        Ok(Self { conversations })
497    }
498
499    pub async fn add_conversation(
500        &self,
501        conversation: ConversationRef<'_>,
502    ) -> Result<u64, DBError> {
503        let id = self.conversations.add_from(&conversation).await?;
504        self.conversations.flush(unix_ms()).await?;
505        Ok(id)
506    }
507
508    pub async fn update_conversation(
509        &self,
510        id: u64,
511        fields: BTreeMap<String, Fv>,
512    ) -> Result<(), DBError> {
513        self.conversations.update(id, fields).await?;
514        self.conversations.flush(unix_ms()).await?;
515        Ok(())
516    }
517
518    pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
519        self.conversations.get_as(id).await
520    }
521
522    pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
523        let doc = self.conversations.remove(id).await?;
524        self.conversations.flush(unix_ms()).await?;
525        Ok(doc.is_some())
526    }
527
528    pub async fn batch_get_conversations(
529        &self,
530        user: &Principal,
531        ids: Vec<u64>,
532    ) -> Result<Vec<Conversation>, BoxError> {
533        let filter = Some(Filter::And(vec![
534            Box::new(Filter::Field((
535                "_id".to_string(),
536                RangeQuery::Include(ids.into_iter().map(Fv::U64).collect()),
537            ))),
538            Box::new(Filter::Field((
539                "user".to_string(),
540                RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
541            ))),
542        ]));
543
544        let rt: Vec<Conversation> = self
545            .conversations
546            .search_as(Query {
547                search: None,
548                filter,
549                limit: None,
550            })
551            .await?;
552        Ok(rt)
553    }
554
555    pub async fn list_conversations_by_user(
556        &self,
557        user: &Principal,
558        cursor: Option<String>,
559        limit: Option<usize>,
560    ) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
561        let limit = limit.unwrap_or(10).min(100);
562        let cursor = match BTree::from_cursor::<u64>(&cursor)? {
563            Some(cursor) => cursor,
564            None => self.conversations.max_document_id() + 1,
565        };
566        let filter = Some(Filter::And(vec![
567            Box::new(Filter::Field((
568                "user".to_string(),
569                RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
570            ))),
571            Box::new(Filter::Field((
572                "_id".to_string(),
573                RangeQuery::Lt(Fv::U64(cursor)),
574            ))),
575        ]));
576
577        let rt: Vec<Conversation> = self
578            .conversations
579            .search_as(Query {
580                search: None,
581                filter,
582                limit: Some(limit),
583            })
584            .await?;
585        let cursor = if rt.len() >= limit {
586            BTree::to_cursor(&rt.first().unwrap()._id)
587        } else {
588            None
589        };
590        Ok((rt, cursor))
591    }
592
593    pub async fn search_conversations(
594        &self,
595        user: &Principal,
596        query: String,
597        limit: Option<usize>,
598    ) -> Result<Vec<Conversation>, BoxError> {
599        let rt = self
600            .conversations
601            .search_as(Query {
602                search: Some(Search {
603                    text: Some(query.to_string()),
604                    logical_search: true,
605                    ..Default::default()
606                }),
607                filter: Some(Filter::Field((
608                    "user".to_string(),
609                    RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
610                ))),
611                limit,
612            })
613            .await?;
614        Ok(rt)
615    }
616
617    pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
618        let period = timestamp / 3600 / 1000;
619        let filter = Filter::Field(("period".to_string(), RangeQuery::Lt(Fv::U64(period))));
620        let ids = self
621            .conversations
622            .search_ids(Query {
623                search: None,
624                filter: Some(filter),
625                limit: None,
626            })
627            .await?;
628        let count = ids.len() as u64;
629        for id in ids {
630            let _ = self.conversations.remove(id).await;
631        }
632        let now_ms = unix_ms();
633        self.conversations.flush(now_ms).await?;
634        Ok(count)
635    }
636}
637
638#[derive(Debug, Clone)]
639pub struct MemoryManagement {
640    pub nexus: Arc<CognitiveNexus>,
641    pub conversations: Arc<Collection>,
642    pub resources: Arc<Collection>,
643    pub kip_function_definitions: FunctionDefinition,
644}
645
646impl MemoryManagement {
647    pub async fn connect(db: Arc<AndaDB>, nexus: Arc<CognitiveNexus>) -> Result<Self, BoxError> {
648        let mut schema = Conversation::schema()?;
649        schema.with_version(4);
650
651        let conversations = db
652            .open_or_create_collection(
653                schema,
654                CollectionConfig {
655                    name: "conversations".to_string(),
656                    description: "conversations collection".to_string(),
657                },
658                async |collection| {
659                    // set tokenizer
660                    collection.set_tokenizer(jieba_tokenizer());
661                    // create BTree indexes if not exists
662                    collection.create_btree_index_nx(&["user"]).await?;
663                    collection.create_btree_index_nx(&["thread"]).await?;
664                    collection.create_btree_index_nx(&["period"]).await?;
665                    collection
666                        .create_bm25_index_nx(&["messages", "resources", "artifacts"])
667                        .await?;
668
669                    Ok::<(), DBError>(())
670                },
671            )
672            .await?;
673
674        let schema = Resource::schema()?;
675        let resources = db
676            .open_or_create_collection(
677                schema,
678                CollectionConfig {
679                    name: "resources".to_string(),
680                    description: "Resources collection".to_string(),
681                },
682                async |collection| {
683                    // set tokenizer
684                    collection.set_tokenizer(jieba_tokenizer());
685                    // create BTree indexes if not exists
686                    collection.create_btree_index_nx(&["tags"]).await?;
687                    collection.create_btree_index_nx(&["hash"]).await?;
688                    collection.create_btree_index_nx(&["mime_type"]).await?;
689                    collection
690                        .create_bm25_index_nx(&["name", "description", "metadata"])
691                        .await?;
692
693                    Ok::<(), DBError>(())
694                },
695            )
696            .await?;
697
698        Ok(Self {
699            nexus,
700            conversations,
701            resources,
702            kip_function_definitions: FUNCTION_DEFINITION.clone(),
703        })
704    }
705
706    pub fn with_kip_function_definitions(mut self, def: FunctionDefinition) -> Self {
707        self.kip_function_definitions = def;
708        self
709    }
710
711    pub fn nexus(&self) -> Arc<CognitiveNexus> {
712        self.nexus.clone()
713    }
714
715    pub fn max_conversation_id(&self) -> u64 {
716        self.conversations.max_document_id()
717    }
718
719    pub async fn describe_primer(&self) -> Result<Json, KipError> {
720        let (primer, _) = self
721            .nexus
722            .execute_meta(MetaCommand::Describe(DescribeTarget::Primer))
723            .await?;
724        Ok(primer)
725    }
726
727    pub async fn describe_system(&self) -> Result<Json, KipError> {
728        let system = self
729            .nexus
730            .get_concept(&ConceptPK::Object {
731                r#type: PERSON_TYPE.to_string(),
732                name: META_SYSTEM_NAME.to_string(),
733            })
734            .await?;
735        let (domains, _) = self
736            .nexus
737            .execute_meta(MetaCommand::Describe(DescribeTarget::Domains))
738            .await?;
739        Ok(json!({
740            "identity": system.to_concept_node(),
741            "domains": domains,
742        }))
743    }
744
745    pub async fn describe_caller(&self, id: &Principal) -> Result<Json, KipError> {
746        let user = self
747            .nexus
748            .get_concept(&ConceptPK::Object {
749                r#type: PERSON_TYPE.to_string(),
750                name: id.to_string(),
751            })
752            .await?;
753
754        Ok(user.to_concept_node())
755    }
756
757    pub async fn get_or_init_caller(
758        &self,
759        id: &Principal,
760        name: Option<String>,
761    ) -> Result<Json, KipError> {
762        let mut attributes = Map::new();
763        let mut metadata = Map::new();
764        attributes.insert("id".to_string(), id.to_string().into());
765        attributes.insert("person_class".to_string(), "Human".into());
766        if let Some(name) = name {
767            attributes.insert("name".to_string(), name.into());
768        }
769        metadata.insert("author".to_string(), "$system".into());
770        metadata.insert("status".to_string(), "active".into());
771        let user = self
772            .nexus
773            .get_or_init_concept(
774                PERSON_TYPE.to_string(),
775                id.to_string(),
776                attributes,
777                metadata,
778            )
779            .await?;
780
781        Ok(user.to_concept_node())
782    }
783
784    pub async fn add_resource(&self, resource: ResourceRef<'_>) -> Result<u64, DBError> {
785        let id = self.resources.add_from(&resource).await?;
786        self.resources.flush(unix_ms()).await?;
787        Ok(id)
788    }
789
790    pub async fn try_add_resources(
791        &self,
792        resources: &[Resource],
793    ) -> Result<Vec<Resource>, BoxError> {
794        let mut rs: Vec<Resource> = Vec::with_capacity(resources.len());
795        let mut count = 0;
796        for r in resources.iter() {
797            let rf: ResourceRef = r.into();
798            let id = if r._id > 0 {
799                r._id // TODO: check if the resource exists and has permission
800            } else {
801                match self.resources.add_from(&rf).await {
802                    Ok(id) => {
803                        count += 1;
804                        id
805                    }
806                    Err(DBError::AlreadyExists { _id, .. }) => _id,
807                    Err(err) => Err(err)?,
808                }
809            };
810
811            let r2 = Resource {
812                _id: id,
813                blob: None,
814                ..r.clone()
815            };
816            rs.push(r2)
817        }
818
819        if count > 0 {
820            self.resources.flush(unix_ms()).await?;
821        }
822
823        Ok(rs)
824    }
825
826    pub async fn get_resource(&self, id: u64) -> Result<Resource, DBError> {
827        self.resources.get_as(id).await
828    }
829
830    pub async fn add_conversation(
831        &self,
832        conversation: ConversationRef<'_>,
833    ) -> Result<u64, DBError> {
834        let id = self.conversations.add_from(&conversation).await?;
835        self.conversations.flush(unix_ms()).await?;
836        Ok(id)
837    }
838
839    pub async fn update_conversation(
840        &self,
841        id: u64,
842        fields: BTreeMap<String, Fv>,
843    ) -> Result<(), DBError> {
844        self.conversations.update(id, fields).await?;
845        self.conversations.flush(unix_ms()).await?;
846        Ok(())
847    }
848
849    pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
850        self.conversations.get_as(id).await
851    }
852
853    pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
854        let doc = self.conversations.remove(id).await?;
855        self.conversations.flush(unix_ms()).await?;
856        Ok(doc.is_some())
857    }
858
859    pub async fn list_conversations_by_user(
860        &self,
861        user: &Principal,
862        cursor: Option<String>,
863        limit: Option<usize>,
864    ) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
865        let limit = limit.unwrap_or(10).min(100);
866        let cursor = match BTree::from_cursor::<u64>(&cursor)? {
867            Some(cursor) => cursor,
868            None => self.conversations.max_document_id() + 1,
869        };
870        let filter = Some(Filter::And(vec![
871            Box::new(Filter::Field((
872                "user".to_string(),
873                RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
874            ))),
875            Box::new(Filter::Field((
876                "_id".to_string(),
877                RangeQuery::Lt(Fv::U64(cursor)),
878            ))),
879        ]));
880
881        let rt: Vec<Conversation> = self
882            .conversations
883            .search_as(Query {
884                search: None,
885                filter,
886                limit: Some(limit),
887            })
888            .await?;
889        let cursor = if rt.len() >= limit {
890            BTree::to_cursor(&rt.first().unwrap()._id)
891        } else {
892            None
893        };
894        Ok((rt, cursor))
895    }
896
897    pub async fn search_conversations(
898        &self,
899        user: &Principal,
900        query: String,
901        limit: Option<usize>,
902    ) -> Result<Vec<Conversation>, BoxError> {
903        let rt = self
904            .conversations
905            .search_as(Query {
906                search: Some(Search {
907                    text: Some(query.to_string()),
908                    logical_search: true,
909                    ..Default::default()
910                }),
911                filter: Some(Filter::Field((
912                    "user".to_string(),
913                    RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
914                ))),
915                limit,
916            })
917            .await?;
918        Ok(rt)
919    }
920
921    pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
922        let period = timestamp / 3600 / 1000;
923        let filter = Filter::Field(("period".to_string(), RangeQuery::Lt(Fv::U64(period))));
924        let ids = self
925            .conversations
926            .search_ids(Query {
927                search: None,
928                filter: Some(filter),
929                limit: None,
930            })
931            .await?;
932        let count = ids.len() as u64;
933        for id in ids {
934            if let Ok(Some(doc)) = self.conversations.remove(id).await
935                && let Ok(conversation) = doc.try_into::<Conversation>()
936            {
937                for resource in conversation.resources {
938                    if resource._id > 0 {
939                        let _ = self.resources.remove(resource._id).await;
940                    }
941                }
942
943                for artifact in conversation.artifacts {
944                    if artifact._id > 0 {
945                        let _ = self.resources.remove(artifact._id).await;
946                    }
947                }
948            };
949        }
950
951        let now_ms = unix_ms();
952        self.conversations.flush(now_ms).await?;
953        self.resources.flush(now_ms).await?;
954        Ok(count)
955    }
956}
957
958/// KIP tool for memory management
959impl Tool<BaseCtx> for MemoryManagement {
960    type Args = Request;
961    type Output = Response;
962
963    fn name(&self) -> String {
964        self.kip_function_definitions.name.clone()
965    }
966
967    fn description(&self) -> String {
968        self.kip_function_definitions.description.clone()
969    }
970
971    fn definition(&self) -> FunctionDefinition {
972        self.kip_function_definitions.clone()
973    }
974
975    async fn call(
976        &self,
977        _ctx: BaseCtx,
978        request: Self::Args,
979        _resources: Vec<Resource>,
980    ) -> Result<ToolOutput<Self::Output>, BoxError> {
981        let (_, res) = request.execute(self.nexus.as_ref()).await;
982        Ok(ToolOutput {
983            is_error: if matches!(res, Response::Err { .. }) {
984                Some(true)
985            } else {
986                None
987            },
988            output: res,
989            artifacts: Vec::new(),
990            usage: Usage::default(),
991            tools_usage: HashMap::new(),
992        })
993    }
994}
995
996/// A read-only version of the KIP tool for memory management, which does not allow any modifications to the memory and is safe to use for retrieval operations.
997#[derive(Debug, Clone)]
998pub struct MemoryReadonly {
999    memory: Arc<MemoryManagement>,
1000}
1001
1002impl MemoryReadonly {
1003    pub const NAME: &'static str = "execute_kip_readonly";
1004
1005    /// Creates a new MemoryReadonly instance
1006    pub fn new(memory: Arc<MemoryManagement>) -> Self {
1007        Self { memory }
1008    }
1009}
1010
1011impl Tool<BaseCtx> for MemoryReadonly {
1012    type Args = Request;
1013    type Output = Response;
1014
1015    fn name(&self) -> String {
1016        Self::NAME.to_string()
1017    }
1018
1019    fn description(&self) -> String {
1020        "Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to read from your persistent memory. This tool does not allow any modifications to the memory and is safe to use for retrieval operations.".to_string()
1021    }
1022
1023    fn definition(&self) -> FunctionDefinition {
1024        FunctionDefinition {
1025            name: self.name(),
1026            description: self.description(),
1027            parameters: self.memory.kip_function_definitions.parameters.clone(),
1028            strict: Some(true),
1029        }
1030    }
1031
1032    async fn call(
1033        &self,
1034        _ctx: BaseCtx,
1035        mut request: Self::Args,
1036        _resources: Vec<Resource>,
1037    ) -> Result<ToolOutput<Self::Output>, BoxError> {
1038        let (_, res) = request.readonly().execute(self.memory.nexus.as_ref()).await;
1039        Ok(ToolOutput {
1040            is_error: if matches!(res, Response::Err { .. }) {
1041                Some(true)
1042            } else {
1043                None
1044            },
1045            output: res,
1046            artifacts: Vec::new(),
1047            usage: Usage::default(),
1048            tools_usage: HashMap::new(),
1049        })
1050    }
1051}
1052
1053/// Arguments for "get_resource_content" tool
1054#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1055pub struct GetResourceContentArgs {
1056    /// The ID of the resource to get
1057    pub _id: u64,
1058}
1059
1060#[derive(Debug, Clone)]
1061pub struct GetResourceContentTool {
1062    memory: Arc<MemoryManagement>,
1063    schema: Json,
1064}
1065
1066impl GetResourceContentTool {
1067    pub const NAME: &'static str = "get_resource_content";
1068
1069    /// Creates a new GetResourceContentTool instance
1070    pub fn new(memory: Arc<MemoryManagement>) -> Self {
1071        let schema = gen_schema_for::<GetResourceContentArgs>();
1072        Self { memory, schema }
1073    }
1074}
1075
1076impl Tool<BaseCtx> for GetResourceContentTool {
1077    type Args = GetResourceContentArgs;
1078    type Output = Response;
1079
1080    fn name(&self) -> String {
1081        Self::NAME.to_string()
1082    }
1083
1084    fn description(&self) -> String {
1085        "Retrieves the full content of a stored resource by its ID. Returns the content as plain text if UTF-8 encoded, or as a base64url-encoded string for binary data. If the resource has no local blob but has a URI, it will be fetched from the remote source.".to_string()
1086    }
1087
1088    fn definition(&self) -> FunctionDefinition {
1089        FunctionDefinition {
1090            name: self.name(),
1091            description: self.description(),
1092            parameters: self.schema.clone(),
1093            strict: Some(true),
1094        }
1095    }
1096
1097    async fn call(
1098        &self,
1099        ctx: BaseCtx,
1100        args: Self::Args,
1101        _resources: Vec<Resource>,
1102    ) -> Result<ToolOutput<Self::Output>, BoxError> {
1103        let res = self.memory.get_resource(args._id).await?;
1104        let text = match res.blob {
1105            Some(blob) => match String::from_utf8(blob.0) {
1106                Ok(s) => s,
1107                Err(e) => ByteBufB64(e.into_bytes()).to_string(),
1108            },
1109            None => match res.uri {
1110                Some(uri) => FetchWebResourcesTool::fetch_as_text(&ctx, &uri).await?,
1111                None => Err(format!("Invalid resource {}, no blob or uri", args._id))?,
1112            },
1113        };
1114
1115        Ok(ToolOutput::new(Response::ok(text.into())))
1116    }
1117}
1118
1119/// Arguments for "list_previous_conversations" tool
1120#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1121pub struct ListConversationsArgs {
1122    /// The cursor for pagination
1123    #[serde(default)]
1124    pub cursor: String,
1125    /// The limit for pagination, max 100
1126    #[serde(default)]
1127    pub limit: usize,
1128}
1129
1130#[derive(Debug, Clone)]
1131pub struct ListConversationsTool {
1132    conversations: Conversations,
1133    description: String,
1134}
1135
1136impl ListConversationsTool {
1137    pub const NAME: &'static str = "list_previous_conversations";
1138
1139    /// Creates a new ListConversationsTool instance
1140    pub fn new(conversations: Conversations) -> Self {
1141        Self { conversations, description: "Lists the current user's previous conversations in reverse chronological order with cursor-based pagination. Returns conversation metadata including status, timestamps, messages, and usage statistics. Use the cursor parameter to paginate through older conversations.".to_string() }
1142    }
1143
1144    pub fn with_description(mut self, description: String) -> Self {
1145        self.description = description;
1146        self
1147    }
1148}
1149
1150impl Tool<BaseCtx> for ListConversationsTool {
1151    type Args = ListConversationsArgs;
1152    type Output = Response;
1153
1154    fn name(&self) -> String {
1155        Self::NAME.to_string()
1156    }
1157
1158    fn description(&self) -> String {
1159        self.description.clone()
1160    }
1161
1162    fn definition(&self) -> FunctionDefinition {
1163        FunctionDefinition {
1164            name: self.name(),
1165            description: self.description.clone(),
1166            parameters: json!({
1167                "type": "object",
1168                "properties": {
1169                    "cursor": {
1170                        "type": "string",
1171                        "description": "The cursor for pagination, returned from the previous call. Use an empty string for the first page."
1172                    },
1173                    "limit": {
1174                        "type": "integer",
1175                        "description": "The maximum number of conversations to return, between 1 and 100. Default is 10."
1176                    }
1177                },
1178                "required": ["cursor", "limit"],
1179                "additionalProperties": false
1180            }),
1181            strict: Some(true),
1182        }
1183    }
1184
1185    async fn call(
1186        &self,
1187        ctx: BaseCtx,
1188        args: Self::Args,
1189        _resources: Vec<Resource>,
1190    ) -> Result<ToolOutput<Self::Output>, BoxError> {
1191        let (conversations, next_cursor) = self
1192            .conversations
1193            .list_conversations_by_user(
1194                ctx.caller(),
1195                if args.cursor.is_empty() {
1196                    None
1197                } else {
1198                    Some(args.cursor)
1199                },
1200                if args.limit == 0 {
1201                    None
1202                } else {
1203                    Some(args.limit)
1204                },
1205            )
1206            .await?;
1207        let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
1208        Ok(ToolOutput::new(Response::Ok {
1209            result: Documents::from(docs).to_string().into(),
1210            next_cursor,
1211        }))
1212    }
1213}
1214
1215/// Arguments for "search_conversations" tool
1216#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1217pub struct SearchConversationsArgs {
1218    /// The query string to search
1219    pub query: String,
1220    /// The max number of conversations to return, max 100
1221    #[serde(default)]
1222    pub limit: usize,
1223}
1224
1225#[derive(Debug, Clone)]
1226pub struct SearchConversationsTool {
1227    conversations: Conversations,
1228    description: String,
1229}
1230
1231impl SearchConversationsTool {
1232    pub const NAME: &'static str = "search_conversations";
1233
1234    /// Creates a new SearchConversationsTool instance
1235    pub fn new(conversations: Conversations) -> Self {
1236        Self { conversations, description: "Performs a full-text search across the current user's conversation history using a query string. Searches through messages, resources, and artifacts to find relevant past conversations. Use this to recall specific topics, instructions, or context from previous interactions.".to_string() }
1237    }
1238
1239    pub fn with_description(mut self, description: String) -> Self {
1240        self.description = description;
1241        self
1242    }
1243}
1244
1245impl Tool<BaseCtx> for SearchConversationsTool {
1246    type Args = SearchConversationsArgs;
1247    type Output = Response;
1248
1249    fn name(&self) -> String {
1250        Self::NAME.to_string()
1251    }
1252
1253    fn description(&self) -> String {
1254        self.description.clone()
1255    }
1256
1257    fn definition(&self) -> FunctionDefinition {
1258        FunctionDefinition {
1259            name: self.name(),
1260            description: self.description.clone(),
1261            parameters: json!({
1262                "type": "object",
1263                "properties": {
1264                    "query": {
1265                        "type": "string",
1266                        "description": "The query string to search for in the conversation history."
1267                    },
1268                    "limit": {
1269                        "type": "integer",
1270                        "description": "The maximum number of conversations to return, between 1 and 100. Default is 10."
1271                    }
1272                },
1273                "required": ["query", "limit"],
1274                "additionalProperties": false
1275            }),
1276            strict: Some(true),
1277        }
1278    }
1279
1280    async fn call(
1281        &self,
1282        ctx: BaseCtx,
1283        args: Self::Args,
1284        _resources: Vec<Resource>,
1285    ) -> Result<ToolOutput<Self::Output>, BoxError> {
1286        let conversations = self
1287            .conversations
1288            .search_conversations(
1289                ctx.caller(),
1290                args.query,
1291                if args.limit == 0 {
1292                    None
1293                } else {
1294                    Some(args.limit)
1295                },
1296            )
1297            .await?;
1298
1299        let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
1300        Ok(ToolOutput::new(Response::Ok {
1301            result: Documents::from(docs).to_string().into(),
1302            next_cursor: None,
1303        }))
1304    }
1305}
1306
1307/// Arguments for "memory_api" tool
1308#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
1309#[serde(tag = "type")]
1310pub enum MemoryToolArgs {
1311    GetResource {
1312        /// The ID of the resource to get
1313        _id: u64,
1314        /// The ID of the conversation where the resource is located
1315        conversation: u64,
1316    },
1317    /// Get a conversation by ID
1318    GetConversation {
1319        /// The ID of the conversation to get
1320        _id: u64,
1321    },
1322    GetConversationDelta {
1323        /// The ID of the conversation to get
1324        _id: u64,
1325        /// The messages offset for the conversation delta
1326        messages_offset: usize,
1327        /// The artifacts offset for the conversation delta
1328        artifacts_offset: usize,
1329    },
1330    StopConversation {
1331        /// The ID of the conversation to stop
1332        _id: u64,
1333    },
1334    SteerConversation {
1335        /// The ID of the conversation to steer
1336        _id: u64,
1337        /// The steering message to interrupt the agent mid-run, delivered after current tool execution, skips remaining tools.
1338        message: String,
1339    },
1340    FollowUpConversation {
1341        /// The ID of the conversation to follow up
1342        _id: u64,
1343        /// The follow-up message to be processed after the agent finishes, delivered only when agent has no more tool calls or steering messages.
1344        message: String,
1345    },
1346    DeleteConversation {
1347        /// The ID of the conversation to delete
1348        _id: u64,
1349    },
1350    /// List previous conversations
1351    ListPrevConversations {
1352        /// The cursor for pagination
1353        cursor: Option<String>,
1354        /// The limit for pagination, default to 10
1355        limit: Option<usize>,
1356    },
1357    /// Search conversations
1358    SearchConversations {
1359        /// The query string to search
1360        query: String,
1361        /// The max number of conversations to return, default to 10
1362        limit: Option<usize>,
1363    },
1364}
1365
1366/// A tool for conversation API
1367#[derive(Debug, Clone)]
1368pub struct MemoryTool {
1369    memory: Arc<MemoryManagement>,
1370    schema: Json,
1371}
1372
1373impl MemoryTool {
1374    pub const NAME: &'static str = "memory_api";
1375
1376    /// Creates a new SearchConversationsTool instance
1377    pub fn new(memory: Arc<MemoryManagement>) -> Self {
1378        let schema = memory_tool_schema();
1379        Self { memory, schema }
1380    }
1381}
1382
1383fn memory_tool_schema() -> Json {
1384    json!({
1385        "type": "object",
1386        "description": "Select one memory API action with type, then provide the fields used by that action. Fields not used by the selected type should be null.",
1387        "properties": {
1388            "type": {
1389                "type": "string",
1390                "enum": [
1391                    "GetResource",
1392                    "GetConversation",
1393                    "GetConversationDelta",
1394                    "StopConversation",
1395                    "SteerConversation",
1396                    "FollowUpConversation",
1397                    "DeleteConversation",
1398                    "ListPrevConversations",
1399                    "SearchConversations"
1400                ],
1401                "description": "Memory API action to perform."
1402            },
1403            "_id": {
1404                "type": ["integer", "null"],
1405                "description": "Resource or conversation ID. Required for actions that target a single resource or conversation."
1406            },
1407            "conversation": {
1408                "type": ["integer", "null"],
1409                "description": "Conversation ID containing the resource. Required for GetResource."
1410            },
1411            "messages_offset": {
1412                "type": ["integer", "null"],
1413                "description": "Messages offset for GetConversationDelta. Use 0 for the first delta read."
1414            },
1415            "artifacts_offset": {
1416                "type": ["integer", "null"],
1417                "description": "Artifacts offset for GetConversationDelta. Use 0 for the first delta read."
1418            },
1419            "message": {
1420                "type": ["string", "null"],
1421                "description": "Message used by SteerConversation or FollowUpConversation."
1422            },
1423            "cursor": {
1424                "type": ["string", "null"],
1425                "description": "Pagination cursor for ListPrevConversations. Use null for the first page."
1426            },
1427            "limit": {
1428                "type": ["integer", "null"],
1429                "description": "Maximum results for listing or searching conversations. Use null for the default."
1430            },
1431            "query": {
1432                "type": ["string", "null"],
1433                "description": "Search query for SearchConversations."
1434            }
1435        },
1436        "required": [
1437            "type",
1438            "_id",
1439            "conversation",
1440            "messages_offset",
1441            "artifacts_offset",
1442            "message",
1443            "cursor",
1444            "limit",
1445            "query"
1446        ],
1447        "additionalProperties": false
1448    })
1449}
1450
1451impl Tool<BaseCtx> for MemoryTool {
1452    type Args = MemoryToolArgs;
1453    type Output = Response;
1454
1455    fn name(&self) -> String {
1456        Self::NAME.to_string()
1457    }
1458
1459    fn description(&self) -> String {
1460        "A unified API for managing conversations and memory. Supports retrieving resources and conversation details, stopping or steering in-progress conversations, sending follow-up messages, deleting conversations, listing previous conversations with pagination, searching conversation history by keyword, and listing KIP command logs.".to_string()
1461    }
1462
1463    fn definition(&self) -> FunctionDefinition {
1464        FunctionDefinition {
1465            name: self.name(),
1466            description: self.description(),
1467            parameters: self.schema.clone(),
1468            strict: Some(true),
1469        }
1470    }
1471
1472    async fn call(
1473        &self,
1474        ctx: BaseCtx,
1475        args: Self::Args,
1476        _resources: Vec<Resource>,
1477    ) -> Result<ToolOutput<Self::Output>, BoxError> {
1478        match args {
1479            MemoryToolArgs::GetResource { _id, conversation } => {
1480                let conversation = self.memory.get_conversation(conversation).await?;
1481                if &conversation.user != ctx.caller() {
1482                    return Err("permission denied".into());
1483                }
1484
1485                let mut res = self.memory.get_resource(_id).await?;
1486                if res.blob.is_none()
1487                    && let Some(uri) = &res.uri
1488                {
1489                    res.blob = FetchWebResourcesTool::fetch_as_bytes(&ctx, uri).await.ok();
1490                }
1491
1492                Ok(ToolOutput::new(Response::Ok {
1493                    result: json!(res),
1494                    next_cursor: None,
1495                }))
1496            }
1497            MemoryToolArgs::GetConversation { _id } => {
1498                let conversation = self.memory.get_conversation(_id).await?;
1499                if &conversation.user != ctx.caller() {
1500                    return Err("permission denied".into());
1501                }
1502
1503                Ok(ToolOutput::new(Response::Ok {
1504                    result: json!(conversation),
1505                    next_cursor: None,
1506                }))
1507            }
1508            MemoryToolArgs::GetConversationDelta {
1509                _id,
1510                messages_offset,
1511                artifacts_offset,
1512            } => {
1513                let conversation = self.memory.get_conversation(_id).await?;
1514                if &conversation.user != ctx.caller() {
1515                    return Err("permission denied".into());
1516                }
1517
1518                Ok(ToolOutput::new(Response::Ok {
1519                    result: json!(conversation.into_delta(messages_offset, artifacts_offset)),
1520                    next_cursor: None,
1521                }))
1522            }
1523            MemoryToolArgs::StopConversation { _id } => {
1524                let mut conversation = self.memory.get_conversation(_id).await?;
1525                if &conversation.user != ctx.caller() {
1526                    return Err("permission denied".into());
1527                }
1528
1529                if conversation.status == ConversationStatus::Working
1530                    || conversation.status == ConversationStatus::Submitted
1531                {
1532                    conversation.status = ConversationStatus::Cancelled;
1533                    conversation.updated_at = unix_ms();
1534                    let changes = BTreeMap::from([
1535                        (
1536                            "status".to_string(),
1537                            Fv::Text(conversation.status.to_string()),
1538                        ),
1539                        ("updated_at".to_string(), Fv::U64(conversation.updated_at)),
1540                    ]);
1541                    self.memory.update_conversation(_id, changes).await?;
1542                }
1543
1544                Ok(ToolOutput::new(Response::Ok {
1545                    result: json!(conversation),
1546                    next_cursor: None,
1547                }))
1548            }
1549            MemoryToolArgs::SteerConversation { _id, message } => {
1550                if message.trim().is_empty() {
1551                    return Err("steering message cannot be empty".into());
1552                }
1553
1554                let mut conversation = self.memory.get_conversation(_id).await?;
1555                if &conversation.user != ctx.caller() {
1556                    return Err("permission denied".into());
1557                }
1558
1559                let steering_messages = if let Some(msg) = conversation.steering_messages.clone() {
1560                    let mut msgs = msg;
1561                    msgs.push(message.clone());
1562                    msgs
1563                } else {
1564                    vec![message.clone()]
1565                };
1566                conversation.steering_messages = Some(steering_messages.clone());
1567                conversation.updated_at = unix_ms();
1568                let changes = BTreeMap::from([
1569                    ("steering_messages".to_string(), steering_messages.into()),
1570                    ("updated_at".to_string(), Fv::U64(conversation.updated_at)),
1571                ]);
1572                self.memory.update_conversation(_id, changes).await?;
1573
1574                Ok(ToolOutput::new(Response::Ok {
1575                    result: json!(conversation),
1576                    next_cursor: None,
1577                }))
1578            }
1579            MemoryToolArgs::FollowUpConversation { _id, message } => {
1580                if message.trim().is_empty() {
1581                    return Err("follow-up message cannot be empty".into());
1582                }
1583
1584                let mut conversation = self.memory.get_conversation(_id).await?;
1585                if &conversation.user != ctx.caller() {
1586                    return Err("permission denied".into());
1587                }
1588
1589                let follow_up_messages = if let Some(msg) = conversation.follow_up_messages.clone()
1590                {
1591                    let mut msgs = msg;
1592                    msgs.push(message.clone());
1593                    msgs
1594                } else {
1595                    vec![message.clone()]
1596                };
1597                conversation.follow_up_messages = Some(follow_up_messages.clone());
1598                conversation.updated_at = unix_ms();
1599                let changes = BTreeMap::from([
1600                    ("follow_up_messages".to_string(), follow_up_messages.into()),
1601                    ("updated_at".to_string(), Fv::U64(conversation.updated_at)),
1602                ]);
1603                self.memory.update_conversation(_id, changes).await?;
1604
1605                Ok(ToolOutput::new(Response::Ok {
1606                    result: json!(conversation),
1607                    next_cursor: None,
1608                }))
1609            }
1610            MemoryToolArgs::DeleteConversation { _id } => {
1611                let conversation = self.memory.get_conversation(_id).await?;
1612                if &conversation.user != ctx.caller() {
1613                    return Err("permission denied".into());
1614                }
1615
1616                let deleted = self.memory.delete_conversation(_id).await?;
1617                Ok(ToolOutput::new(Response::Ok {
1618                    result: json!({ "deleted": deleted }),
1619                    next_cursor: None,
1620                }))
1621            }
1622            MemoryToolArgs::ListPrevConversations { cursor, limit } => {
1623                let (conversations, next_cursor) = self
1624                    .memory
1625                    .list_conversations_by_user(ctx.caller(), cursor, limit)
1626                    .await?;
1627
1628                Ok(ToolOutput::new(Response::Ok {
1629                    result: json!(conversations),
1630                    next_cursor,
1631                }))
1632            }
1633            MemoryToolArgs::SearchConversations { query, limit } => {
1634                let conversations = self
1635                    .memory
1636                    .search_conversations(ctx.caller(), query, limit)
1637                    .await?;
1638
1639                Ok(ToolOutput::new(Response::Ok {
1640                    result: json!(conversations),
1641                    next_cursor: None,
1642                }))
1643            }
1644        }
1645    }
1646}
1647
1648#[cfg(test)]
1649mod tests {
1650    use super::*;
1651
1652    #[test]
1653    fn test_conversation_status() {
1654        let chat = Conversation {
1655            status: ConversationStatus::Completed,
1656            ..Default::default()
1657        };
1658        let rt = ConversationStatus::Completed;
1659        println!("{}", rt);
1660
1661        let rt = serde_json::to_string(&chat).unwrap();
1662        assert!(rt.contains(r#","status":"completed","#));
1663        let chat2: Conversation = serde_json::from_str(&rt).unwrap();
1664        assert_eq!(chat.status, chat2.status);
1665
1666        let args = MemoryToolArgs::GetConversation { _id: 1 };
1667        let rt = serde_json::to_string(&args).unwrap();
1668        assert_eq!(rt, r#"{"type":"GetConversation","_id":1}"#);
1669        let args1: MemoryToolArgs = serde_json::from_str(&rt).unwrap();
1670        assert_eq!(args, args1);
1671
1672        let strict_args: MemoryToolArgs = serde_json::from_value(json!({
1673            "type": "GetConversation",
1674            "_id": 1,
1675            "conversation": null,
1676            "messages_offset": null,
1677            "artifacts_offset": null,
1678            "message": null,
1679            "cursor": null,
1680            "limit": null,
1681            "query": null
1682        }))
1683        .unwrap();
1684        assert_eq!(strict_args, args);
1685
1686        let schema = memory_tool_schema();
1687        let required = schema["required"].as_array().unwrap();
1688        let properties = schema["properties"].as_object().unwrap();
1689        assert_eq!(required.len(), properties.len());
1690        for key in properties.keys() {
1691            assert!(required.iter().any(|item| item.as_str() == Some(key)));
1692        }
1693    }
1694}