1use anda_cognitive_nexus::{CognitiveNexus, ConceptPK};
10use anda_core::{
11 BoxError, ContentPart, Document, Documents, FunctionDefinition, Message, Resource, ResourceRef,
12 StateFeatures, Tool, ToolOutput, Usage, Xid, gen_schema_for,
13};
14use anda_db::{
15 collection::{Collection, CollectionConfig},
16 database::AndaDB,
17 error::DBError,
18 index::BTree,
19 query::{Filter, Query, RangeQuery, Search},
20};
21use anda_db_schema::{
22 AndaDBSchema, FieldEntry, FieldKey, FieldType, Ft, Fv, Json, Schema, SchemaError,
23};
24use anda_db_tfs::jieba_tokenizer;
25use anda_kip::{
26 DescribeTarget, KipError, META_SYSTEM_NAME, MetaCommand, PERSON_TYPE, Request, Response,
27};
28use candid::Principal;
29use ciborium::cbor;
30use ic_auth_types::ByteBufB64;
31use schemars::JsonSchema;
32use serde::{Deserialize, Serialize};
33use serde_json::{Map, json};
34use std::{
35 collections::{BTreeMap, HashMap},
36 fmt,
37 sync::{Arc, LazyLock},
38};
39
40use crate::{context::BaseCtx, extension::fetch::FetchWebResourcesTool, rfc3339_datetime, unix_ms};
41
42pub static FUNCTION_DEFINITION: LazyLock<FunctionDefinition> = LazyLock::new(|| {
43 serde_json::from_value(json!({
44 "name": "execute_kip",
45 "description": "Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to interact with your persistent memory.",
46 "parameters": {
47 "type": "object",
48 "properties": {
49 "commands": {
50 "type": "array",
51 "description": "An array of KIP commands for batch execution (reduces round-trips). Commands are executed sequentially; execution stops on first KML error.",
52 "items": {
53 "type": "string"
54 }
55 },
56 "parameters": {
57 "type": "object",
58 "description": "An optional JSON object of key-value pairs used for safe substitution of placeholders in the command string(s). Placeholders should start with ':' (e.g., :name, :limit). IMPORTANT: A placeholder must represent a complete JSON value token (e.g., name: :name). Do not embed placeholders inside quoted strings (e.g., \"Hello :name\"), because substitution uses JSON serialization."
59 },
60 },
61 "required": ["commands", "parameters"],
62 "additionalProperties": false
63 }
64 })).unwrap()
65});
66
67#[derive(Debug, Clone, Deserialize, Serialize, AndaDBSchema)]
71pub struct Conversation {
72 pub _id: u64,
74
75 #[field_type = "Bytes"]
77 pub user: Principal,
78
79 #[field_type = "Option<Bytes>"]
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub thread: Option<Xid>,
83
84 pub messages: Vec<Json>,
86
87 pub resources: Vec<Resource>,
89
90 pub artifacts: Vec<Resource>,
92
93 #[field_type = "Text"]
95 pub status: ConversationStatus,
96
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub failed_reason: Option<String>,
100
101 #[field_type = "Map<String, U64>"]
103 pub usage: Usage,
104
105 #[serde(skip_serializing_if = "Option::is_none")]
110 pub steering_messages: Option<Vec<String>>,
111
112 #[serde(skip_serializing_if = "Option::is_none")]
117 pub follow_up_messages: Option<Vec<String>>,
118
119 #[serde(skip_serializing_if = "Option::is_none")]
121 pub child: Option<u64>,
122
123 #[serde(skip_serializing_if = "Option::is_none")]
126 pub ancestors: Option<Vec<u64>>,
127
128 #[serde(skip_serializing_if = "Option::is_none")]
130 pub label: Option<String>,
131
132 #[serde(skip_serializing_if = "Option::is_none")]
137 pub extra: Option<Json>,
138
139 pub period: u64,
142
143 pub created_at: u64,
145
146 pub updated_at: u64,
148}
149
150impl Default for Conversation {
151 fn default() -> Self {
152 Self {
153 _id: 0,
154 user: Principal::anonymous(),
155 thread: None,
156 messages: Vec::new(),
157 resources: Vec::new(),
158 artifacts: Vec::new(),
159 status: ConversationStatus::default(),
160 failed_reason: None,
161 usage: Usage::default(),
162 steering_messages: None,
163 follow_up_messages: None,
164 child: None,
165 ancestors: None,
166 label: None,
167 extra: None,
168 period: 0,
169 created_at: 0,
170 updated_at: 0,
171 }
172 }
173}
174
175impl Conversation {
176 pub fn append_messages(&mut self, message: Vec<Message>) {
177 self.messages.extend(message.into_iter().map(|v| json!(v)));
178 }
179
180 pub fn to_changes(&self) -> Result<BTreeMap<String, Fv>, BoxError> {
181 let mut changes = BTreeMap::from([
182 (
183 "messages".to_string(),
184 Fv::array_from(cbor!(self.messages).unwrap(), &[Ft::Json])?,
185 ),
186 (
187 "resources".to_string(),
188 Fv::array_from(cbor!(self.resources).unwrap(), &[Resource::field_type()])?,
189 ),
190 (
191 "artifacts".to_string(),
192 Fv::array_from(cbor!(self.artifacts).unwrap(), &[Resource::field_type()])?,
193 ),
194 ("status".to_string(), Fv::Text(self.status.to_string())),
195 (
196 "usage".to_string(),
197 Fv::map_from(
198 cbor!(self.usage).unwrap(),
199 &BTreeMap::from([("*".into(), Ft::U64)]),
200 )?,
201 ),
202 ("updated_at".to_string(), Fv::U64(self.updated_at)),
203 (
204 "steering_messages".to_string(),
205 if let Some(msg) = self.steering_messages.clone() {
206 msg.into()
207 } else {
208 Fv::Null
209 },
210 ),
211 (
212 "follow_up_messages".to_string(),
213 if let Some(msg) = self.follow_up_messages.clone() {
214 msg.into()
215 } else {
216 Fv::Null
217 },
218 ),
219 (
220 "label".to_string(),
221 if let Some(label) = self.label.clone() {
222 label.into()
223 } else {
224 Fv::Null
225 },
226 ),
227 (
228 "extra".to_string(),
229 if let Some(extra) = self.extra.clone() {
230 extra.into()
231 } else {
232 Fv::Null
233 },
234 ),
235 ]);
236
237 if let Some(child) = self.child {
238 changes.insert("child".to_string(), Fv::U64(child));
239 }
240 if let Some(reason) = &self.failed_reason {
241 changes.insert("failed_reason".to_string(), Fv::Text(reason.clone()));
242 }
243 Ok(changes)
244 }
245
246 pub fn to_delta(&self, messages_offset: usize, artifacts_offset: usize) -> ConversationDelta {
247 ConversationDelta {
248 _id: self._id,
249 messages: self
250 .messages
251 .iter()
252 .skip(messages_offset)
253 .cloned()
254 .collect(),
255 artifacts: self
256 .artifacts
257 .iter()
258 .skip(artifacts_offset)
259 .cloned()
260 .collect(),
261 status: self.status.clone(),
262 usage: self.usage.clone(),
263 failed_reason: self.failed_reason.clone(),
264 updated_at: self.updated_at,
265 child: self.child,
266 }
267 }
268
269 pub fn into_delta(self, messages_offset: usize, artifacts_offset: usize) -> ConversationDelta {
270 ConversationDelta {
271 _id: self._id,
272 messages: self.messages.into_iter().skip(messages_offset).collect(),
273 artifacts: self.artifacts.into_iter().skip(artifacts_offset).collect(),
274 status: self.status,
275 usage: self.usage,
276 failed_reason: self.failed_reason,
277 updated_at: self.updated_at,
278 child: self.child,
279 }
280 }
281}
282
283#[derive(Debug, Clone, Default, Deserialize, Serialize, PartialEq, Eq)]
284pub struct PrunedMessage {
285 pub role: String,
286
287 pub content: Vec<ContentPart>,
288
289 #[serde(skip_serializing_if = "Option::is_none")]
290 pub name: Option<String>,
291
292 #[serde(skip_serializing_if = "Option::is_none")]
293 pub user: Option<String>,
294
295 #[serde(skip_serializing_if = "Option::is_none")]
296 pub timestamp: Option<String>,
297}
298
299impl PrunedMessage {
300 pub fn try_from(mut msg: Message) -> Option<Self> {
301 msg.prune_content();
302 Some(Self {
303 role: msg.role,
304 content: msg.content,
305 name: msg.name,
306 user: msg.user.map(|u| u.to_string()),
307 timestamp: msg.timestamp.and_then(rfc3339_datetime),
308 })
309 }
310}
311
312impl From<Conversation> for Document {
313 fn from(conversation: Conversation) -> Self {
314 let mut metadata = BTreeMap::from([
315 ("_id".to_string(), conversation._id.into()),
316 ("type".to_string(), "Conversation".into()),
317 ("user".to_string(), conversation.user.to_string().into()),
318 ("status".to_string(), conversation.status.to_string().into()),
319 (
320 "created_at".to_string(),
321 rfc3339_datetime(conversation.created_at).unwrap().into(),
322 ),
323 (
324 "updated_at".to_string(),
325 rfc3339_datetime(conversation.updated_at).unwrap().into(),
326 ),
327 ]);
328 if let Some(thread) = conversation.thread {
329 metadata.insert("thread".to_string(), thread.to_string().into());
330 }
331 if let Some(label) = conversation.label {
332 metadata.insert("label".to_string(), label.into());
333 }
334 let message: Vec<PrunedMessage> = conversation
335 .messages
336 .iter()
337 .filter_map(|v| {
338 serde_json::from_value::<Message>(v.clone())
339 .ok()
340 .and_then(PrunedMessage::try_from)
341 })
342 .collect();
343 Self {
344 content: serde_json::to_value(message).unwrap_or_default(),
345 metadata,
346 }
347 }
348}
349
350#[derive(Debug, Serialize)]
351pub struct ConversationRef<'a> {
352 pub _id: u64,
353 pub user: &'a Principal,
354 pub thread: Option<&'a Xid>,
355 pub messages: &'a [Json],
356 pub resources: &'a [Resource],
357 pub artifacts: &'a [Resource],
358 pub status: &'a ConversationStatus,
359 pub usage: &'a Usage,
360 #[serde(skip_serializing_if = "Option::is_none")]
361 pub steering_messages: &'a Option<Vec<String>>,
362 #[serde(skip_serializing_if = "Option::is_none")]
363 pub follow_up_messages: &'a Option<Vec<String>>,
364 #[serde(skip_serializing_if = "Option::is_none")]
365 pub label: &'a Option<String>,
366 pub extra: &'a Option<Json>,
367 pub period: u64,
368 pub created_at: u64,
369 pub updated_at: u64,
370 #[serde(skip_serializing_if = "Option::is_none")]
371 pub child: &'a Option<u64>,
372 #[serde(skip_serializing_if = "Option::is_none")]
373 pub ancestors: &'a Option<Vec<u64>>,
374}
375
376impl<'a> From<&'a Conversation> for ConversationRef<'a> {
377 fn from(conversation: &'a Conversation) -> Self {
378 Self {
379 _id: conversation._id,
380 user: &conversation.user,
381 thread: conversation.thread.as_ref(),
382 messages: &conversation.messages,
383 resources: &conversation.resources,
384 artifacts: &conversation.artifacts,
385 status: &conversation.status,
386 usage: &conversation.usage,
387 steering_messages: &conversation.steering_messages,
388 follow_up_messages: &conversation.follow_up_messages,
389 label: &conversation.label,
390 extra: &conversation.extra,
391 period: conversation.period,
392 created_at: conversation.created_at,
393 updated_at: conversation.updated_at,
394 child: &conversation.child,
395 ancestors: &conversation.ancestors,
396 }
397 }
398}
399
400#[derive(Debug, Clone, Deserialize, Serialize)]
401pub struct ConversationState {
402 pub _id: u64,
403 pub status: ConversationStatus,
404}
405
406impl From<&ConversationRef<'_>> for ConversationState {
407 fn from(conversation: &ConversationRef<'_>) -> Self {
408 Self {
409 _id: conversation._id,
410 status: conversation.status.clone(),
411 }
412 }
413}
414
415impl From<&Conversation> for ConversationState {
416 fn from(conversation: &Conversation) -> Self {
417 Self {
418 _id: conversation._id,
419 status: conversation.status.clone(),
420 }
421 }
422}
423
424#[derive(Debug, Clone, Deserialize, Serialize)]
426pub struct ConversationDelta {
427 pub _id: u64,
428 pub messages: Vec<Json>,
430 pub artifacts: Vec<Resource>,
431 pub status: ConversationStatus,
432 pub usage: Usage,
433 pub failed_reason: Option<String>,
434 pub updated_at: u64,
435 pub child: Option<u64>,
436}
437
438#[derive(Debug, Default, Clone, Deserialize, Serialize, PartialEq, Eq)]
439#[serde(rename_all = "lowercase")]
440pub enum ConversationStatus {
441 #[default]
442 Submitted,
443 Working,
444 Idle,
445 Completed,
446 Cancelled,
447 Failed,
448}
449
450impl fmt::Display for ConversationStatus {
451 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452 match self {
453 ConversationStatus::Submitted => write!(f, "submitted"),
454 ConversationStatus::Working => write!(f, "working"),
455 ConversationStatus::Idle => write!(f, "idle"),
456 ConversationStatus::Completed => write!(f, "completed"),
457 ConversationStatus::Cancelled => write!(f, "cancelled"),
458 ConversationStatus::Failed => write!(f, "failed"),
459 }
460 }
461}
462
463#[derive(Debug, Clone)]
464pub struct Conversations {
465 pub conversations: Arc<Collection>,
466}
467
468impl Conversations {
469 pub async fn connect(db: Arc<AndaDB>, name: String) -> Result<Self, BoxError> {
470 let mut schema = Conversation::schema()?;
471 schema.with_version(4);
472
473 let conversations = db
474 .open_or_create_collection(
475 schema,
476 CollectionConfig {
477 name,
478 description: "conversations collection".to_string(),
479 },
480 async |collection| {
481 collection.set_tokenizer(jieba_tokenizer());
483 collection.create_btree_index_nx(&["user"]).await?;
485 collection.create_btree_index_nx(&["thread"]).await?;
486 collection.create_btree_index_nx(&["period"]).await?;
487 collection
488 .create_bm25_index_nx(&["messages", "resources", "artifacts"])
489 .await?;
490
491 Ok::<(), DBError>(())
492 },
493 )
494 .await?;
495
496 Ok(Self { conversations })
497 }
498
499 pub async fn add_conversation(
500 &self,
501 conversation: ConversationRef<'_>,
502 ) -> Result<u64, DBError> {
503 let id = self.conversations.add_from(&conversation).await?;
504 self.conversations.flush(unix_ms()).await?;
505 Ok(id)
506 }
507
508 pub async fn update_conversation(
509 &self,
510 id: u64,
511 fields: BTreeMap<String, Fv>,
512 ) -> Result<(), DBError> {
513 self.conversations.update(id, fields).await?;
514 self.conversations.flush(unix_ms()).await?;
515 Ok(())
516 }
517
518 pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
519 self.conversations.get_as(id).await
520 }
521
522 pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
523 let doc = self.conversations.remove(id).await?;
524 self.conversations.flush(unix_ms()).await?;
525 Ok(doc.is_some())
526 }
527
528 pub async fn batch_get_conversations(
529 &self,
530 user: &Principal,
531 ids: Vec<u64>,
532 ) -> Result<Vec<Conversation>, BoxError> {
533 let filter = Some(Filter::And(vec![
534 Box::new(Filter::Field((
535 "_id".to_string(),
536 RangeQuery::Include(ids.into_iter().map(Fv::U64).collect()),
537 ))),
538 Box::new(Filter::Field((
539 "user".to_string(),
540 RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
541 ))),
542 ]));
543
544 let rt: Vec<Conversation> = self
545 .conversations
546 .search_as(Query {
547 search: None,
548 filter,
549 limit: None,
550 })
551 .await?;
552 Ok(rt)
553 }
554
555 pub async fn list_conversations_by_user(
556 &self,
557 user: &Principal,
558 cursor: Option<String>,
559 limit: Option<usize>,
560 ) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
561 let limit = limit.unwrap_or(10).min(100);
562 let cursor = match BTree::from_cursor::<u64>(&cursor)? {
563 Some(cursor) => cursor,
564 None => self.conversations.max_document_id() + 1,
565 };
566 let filter = Some(Filter::And(vec![
567 Box::new(Filter::Field((
568 "user".to_string(),
569 RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
570 ))),
571 Box::new(Filter::Field((
572 "_id".to_string(),
573 RangeQuery::Lt(Fv::U64(cursor)),
574 ))),
575 ]));
576
577 let rt: Vec<Conversation> = self
578 .conversations
579 .search_as(Query {
580 search: None,
581 filter,
582 limit: Some(limit),
583 })
584 .await?;
585 let cursor = if rt.len() >= limit {
586 BTree::to_cursor(&rt.first().unwrap()._id)
587 } else {
588 None
589 };
590 Ok((rt, cursor))
591 }
592
593 pub async fn search_conversations(
594 &self,
595 user: &Principal,
596 query: String,
597 limit: Option<usize>,
598 ) -> Result<Vec<Conversation>, BoxError> {
599 let rt = self
600 .conversations
601 .search_as(Query {
602 search: Some(Search {
603 text: Some(query.to_string()),
604 logical_search: true,
605 ..Default::default()
606 }),
607 filter: Some(Filter::Field((
608 "user".to_string(),
609 RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
610 ))),
611 limit,
612 })
613 .await?;
614 Ok(rt)
615 }
616
617 pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
618 let period = timestamp / 3600 / 1000;
619 let filter = Filter::Field(("period".to_string(), RangeQuery::Lt(Fv::U64(period))));
620 let ids = self
621 .conversations
622 .search_ids(Query {
623 search: None,
624 filter: Some(filter),
625 limit: None,
626 })
627 .await?;
628 let count = ids.len() as u64;
629 for id in ids {
630 let _ = self.conversations.remove(id).await;
631 }
632 let now_ms = unix_ms();
633 self.conversations.flush(now_ms).await?;
634 Ok(count)
635 }
636}
637
638#[derive(Debug, Clone)]
639pub struct MemoryManagement {
640 pub nexus: Arc<CognitiveNexus>,
641 pub conversations: Arc<Collection>,
642 pub resources: Arc<Collection>,
643 pub kip_function_definitions: FunctionDefinition,
644}
645
646impl MemoryManagement {
647 pub async fn connect(db: Arc<AndaDB>, nexus: Arc<CognitiveNexus>) -> Result<Self, BoxError> {
648 let mut schema = Conversation::schema()?;
649 schema.with_version(4);
650
651 let conversations = db
652 .open_or_create_collection(
653 schema,
654 CollectionConfig {
655 name: "conversations".to_string(),
656 description: "conversations collection".to_string(),
657 },
658 async |collection| {
659 collection.set_tokenizer(jieba_tokenizer());
661 collection.create_btree_index_nx(&["user"]).await?;
663 collection.create_btree_index_nx(&["thread"]).await?;
664 collection.create_btree_index_nx(&["period"]).await?;
665 collection
666 .create_bm25_index_nx(&["messages", "resources", "artifacts"])
667 .await?;
668
669 Ok::<(), DBError>(())
670 },
671 )
672 .await?;
673
674 let schema = Resource::schema()?;
675 let resources = db
676 .open_or_create_collection(
677 schema,
678 CollectionConfig {
679 name: "resources".to_string(),
680 description: "Resources collection".to_string(),
681 },
682 async |collection| {
683 collection.set_tokenizer(jieba_tokenizer());
685 collection.create_btree_index_nx(&["tags"]).await?;
687 collection.create_btree_index_nx(&["hash"]).await?;
688 collection.create_btree_index_nx(&["mime_type"]).await?;
689 collection
690 .create_bm25_index_nx(&["name", "description", "metadata"])
691 .await?;
692
693 Ok::<(), DBError>(())
694 },
695 )
696 .await?;
697
698 Ok(Self {
699 nexus,
700 conversations,
701 resources,
702 kip_function_definitions: FUNCTION_DEFINITION.clone(),
703 })
704 }
705
706 pub fn with_kip_function_definitions(mut self, def: FunctionDefinition) -> Self {
707 self.kip_function_definitions = def;
708 self
709 }
710
711 pub fn nexus(&self) -> Arc<CognitiveNexus> {
712 self.nexus.clone()
713 }
714
715 pub fn max_conversation_id(&self) -> u64 {
716 self.conversations.max_document_id()
717 }
718
719 pub async fn describe_primer(&self) -> Result<Json, KipError> {
720 let (primer, _) = self
721 .nexus
722 .execute_meta(MetaCommand::Describe(DescribeTarget::Primer))
723 .await?;
724 Ok(primer)
725 }
726
727 pub async fn describe_system(&self) -> Result<Json, KipError> {
728 let system = self
729 .nexus
730 .get_concept(&ConceptPK::Object {
731 r#type: PERSON_TYPE.to_string(),
732 name: META_SYSTEM_NAME.to_string(),
733 })
734 .await?;
735 let (domains, _) = self
736 .nexus
737 .execute_meta(MetaCommand::Describe(DescribeTarget::Domains))
738 .await?;
739 Ok(json!({
740 "identity": system.to_concept_node(),
741 "domains": domains,
742 }))
743 }
744
745 pub async fn describe_caller(&self, id: &Principal) -> Result<Json, KipError> {
746 let user = self
747 .nexus
748 .get_concept(&ConceptPK::Object {
749 r#type: PERSON_TYPE.to_string(),
750 name: id.to_string(),
751 })
752 .await?;
753
754 Ok(user.to_concept_node())
755 }
756
757 pub async fn get_or_init_caller(
758 &self,
759 id: &Principal,
760 name: Option<String>,
761 ) -> Result<Json, KipError> {
762 let mut attributes = Map::new();
763 let mut metadata = Map::new();
764 attributes.insert("id".to_string(), id.to_string().into());
765 attributes.insert("person_class".to_string(), "Human".into());
766 if let Some(name) = name {
767 attributes.insert("name".to_string(), name.into());
768 }
769 metadata.insert("author".to_string(), "$system".into());
770 metadata.insert("status".to_string(), "active".into());
771 let user = self
772 .nexus
773 .get_or_init_concept(
774 PERSON_TYPE.to_string(),
775 id.to_string(),
776 attributes,
777 metadata,
778 )
779 .await?;
780
781 Ok(user.to_concept_node())
782 }
783
784 pub async fn add_resource(&self, resource: ResourceRef<'_>) -> Result<u64, DBError> {
785 let id = self.resources.add_from(&resource).await?;
786 self.resources.flush(unix_ms()).await?;
787 Ok(id)
788 }
789
790 pub async fn try_add_resources(
791 &self,
792 resources: &[Resource],
793 ) -> Result<Vec<Resource>, BoxError> {
794 let mut rs: Vec<Resource> = Vec::with_capacity(resources.len());
795 let mut count = 0;
796 for r in resources.iter() {
797 let rf: ResourceRef = r.into();
798 let id = if r._id > 0 {
799 r._id } else {
801 match self.resources.add_from(&rf).await {
802 Ok(id) => {
803 count += 1;
804 id
805 }
806 Err(DBError::AlreadyExists { _id, .. }) => _id,
807 Err(err) => Err(err)?,
808 }
809 };
810
811 let r2 = Resource {
812 _id: id,
813 blob: None,
814 ..r.clone()
815 };
816 rs.push(r2)
817 }
818
819 if count > 0 {
820 self.resources.flush(unix_ms()).await?;
821 }
822
823 Ok(rs)
824 }
825
826 pub async fn get_resource(&self, id: u64) -> Result<Resource, DBError> {
827 self.resources.get_as(id).await
828 }
829
830 pub async fn add_conversation(
831 &self,
832 conversation: ConversationRef<'_>,
833 ) -> Result<u64, DBError> {
834 let id = self.conversations.add_from(&conversation).await?;
835 self.conversations.flush(unix_ms()).await?;
836 Ok(id)
837 }
838
839 pub async fn update_conversation(
840 &self,
841 id: u64,
842 fields: BTreeMap<String, Fv>,
843 ) -> Result<(), DBError> {
844 self.conversations.update(id, fields).await?;
845 self.conversations.flush(unix_ms()).await?;
846 Ok(())
847 }
848
849 pub async fn get_conversation(&self, id: u64) -> Result<Conversation, DBError> {
850 self.conversations.get_as(id).await
851 }
852
853 pub async fn delete_conversation(&self, id: u64) -> Result<bool, DBError> {
854 let doc = self.conversations.remove(id).await?;
855 self.conversations.flush(unix_ms()).await?;
856 Ok(doc.is_some())
857 }
858
859 pub async fn list_conversations_by_user(
860 &self,
861 user: &Principal,
862 cursor: Option<String>,
863 limit: Option<usize>,
864 ) -> Result<(Vec<Conversation>, Option<String>), BoxError> {
865 let limit = limit.unwrap_or(10).min(100);
866 let cursor = match BTree::from_cursor::<u64>(&cursor)? {
867 Some(cursor) => cursor,
868 None => self.conversations.max_document_id() + 1,
869 };
870 let filter = Some(Filter::And(vec![
871 Box::new(Filter::Field((
872 "user".to_string(),
873 RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
874 ))),
875 Box::new(Filter::Field((
876 "_id".to_string(),
877 RangeQuery::Lt(Fv::U64(cursor)),
878 ))),
879 ]));
880
881 let rt: Vec<Conversation> = self
882 .conversations
883 .search_as(Query {
884 search: None,
885 filter,
886 limit: Some(limit),
887 })
888 .await?;
889 let cursor = if rt.len() >= limit {
890 BTree::to_cursor(&rt.first().unwrap()._id)
891 } else {
892 None
893 };
894 Ok((rt, cursor))
895 }
896
897 pub async fn search_conversations(
898 &self,
899 user: &Principal,
900 query: String,
901 limit: Option<usize>,
902 ) -> Result<Vec<Conversation>, BoxError> {
903 let rt = self
904 .conversations
905 .search_as(Query {
906 search: Some(Search {
907 text: Some(query.to_string()),
908 logical_search: true,
909 ..Default::default()
910 }),
911 filter: Some(Filter::Field((
912 "user".to_string(),
913 RangeQuery::Eq(Fv::Bytes(user.as_slice().to_vec())),
914 ))),
915 limit,
916 })
917 .await?;
918 Ok(rt)
919 }
920
921 pub async fn delete_expired_conversations(&self, timestamp: u64) -> Result<u64, BoxError> {
922 let period = timestamp / 3600 / 1000;
923 let filter = Filter::Field(("period".to_string(), RangeQuery::Lt(Fv::U64(period))));
924 let ids = self
925 .conversations
926 .search_ids(Query {
927 search: None,
928 filter: Some(filter),
929 limit: None,
930 })
931 .await?;
932 let count = ids.len() as u64;
933 for id in ids {
934 if let Ok(Some(doc)) = self.conversations.remove(id).await
935 && let Ok(conversation) = doc.try_into::<Conversation>()
936 {
937 for resource in conversation.resources {
938 if resource._id > 0 {
939 let _ = self.resources.remove(resource._id).await;
940 }
941 }
942
943 for artifact in conversation.artifacts {
944 if artifact._id > 0 {
945 let _ = self.resources.remove(artifact._id).await;
946 }
947 }
948 };
949 }
950
951 let now_ms = unix_ms();
952 self.conversations.flush(now_ms).await?;
953 self.resources.flush(now_ms).await?;
954 Ok(count)
955 }
956}
957
958impl Tool<BaseCtx> for MemoryManagement {
960 type Args = Request;
961 type Output = Response;
962
963 fn name(&self) -> String {
964 self.kip_function_definitions.name.clone()
965 }
966
967 fn description(&self) -> String {
968 self.kip_function_definitions.description.clone()
969 }
970
971 fn definition(&self) -> FunctionDefinition {
972 self.kip_function_definitions.clone()
973 }
974
975 async fn call(
976 &self,
977 _ctx: BaseCtx,
978 request: Self::Args,
979 _resources: Vec<Resource>,
980 ) -> Result<ToolOutput<Self::Output>, BoxError> {
981 let (_, res) = request.execute(self.nexus.as_ref()).await;
982 Ok(ToolOutput {
983 is_error: if matches!(res, Response::Err { .. }) {
984 Some(true)
985 } else {
986 None
987 },
988 output: res,
989 artifacts: Vec::new(),
990 usage: Usage::default(),
991 tools_usage: HashMap::new(),
992 })
993 }
994}
995
996#[derive(Debug, Clone)]
998pub struct MemoryReadonly {
999 memory: Arc<MemoryManagement>,
1000}
1001
1002impl MemoryReadonly {
1003 pub const NAME: &'static str = "execute_kip_readonly";
1004
1005 pub fn new(memory: Arc<MemoryManagement>) -> Self {
1007 Self { memory }
1008 }
1009}
1010
1011impl Tool<BaseCtx> for MemoryReadonly {
1012 type Args = Request;
1013 type Output = Response;
1014
1015 fn name(&self) -> String {
1016 Self::NAME.to_string()
1017 }
1018
1019 fn description(&self) -> String {
1020 "Executes one or more KIP (Knowledge Interaction Protocol) commands against the Cognitive Nexus to read from your persistent memory. This tool does not allow any modifications to the memory and is safe to use for retrieval operations.".to_string()
1021 }
1022
1023 fn definition(&self) -> FunctionDefinition {
1024 FunctionDefinition {
1025 name: self.name(),
1026 description: self.description(),
1027 parameters: self.memory.kip_function_definitions.parameters.clone(),
1028 strict: Some(true),
1029 }
1030 }
1031
1032 async fn call(
1033 &self,
1034 _ctx: BaseCtx,
1035 mut request: Self::Args,
1036 _resources: Vec<Resource>,
1037 ) -> Result<ToolOutput<Self::Output>, BoxError> {
1038 let (_, res) = request.readonly().execute(self.memory.nexus.as_ref()).await;
1039 Ok(ToolOutput {
1040 is_error: if matches!(res, Response::Err { .. }) {
1041 Some(true)
1042 } else {
1043 None
1044 },
1045 output: res,
1046 artifacts: Vec::new(),
1047 usage: Usage::default(),
1048 tools_usage: HashMap::new(),
1049 })
1050 }
1051}
1052
1053#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1055pub struct GetResourceContentArgs {
1056 pub _id: u64,
1058}
1059
1060#[derive(Debug, Clone)]
1061pub struct GetResourceContentTool {
1062 memory: Arc<MemoryManagement>,
1063 schema: Json,
1064}
1065
1066impl GetResourceContentTool {
1067 pub const NAME: &'static str = "get_resource_content";
1068
1069 pub fn new(memory: Arc<MemoryManagement>) -> Self {
1071 let schema = gen_schema_for::<GetResourceContentArgs>();
1072 Self { memory, schema }
1073 }
1074}
1075
1076impl Tool<BaseCtx> for GetResourceContentTool {
1077 type Args = GetResourceContentArgs;
1078 type Output = Response;
1079
1080 fn name(&self) -> String {
1081 Self::NAME.to_string()
1082 }
1083
1084 fn description(&self) -> String {
1085 "Retrieves the full content of a stored resource by its ID. Returns the content as plain text if UTF-8 encoded, or as a base64url-encoded string for binary data. If the resource has no local blob but has a URI, it will be fetched from the remote source.".to_string()
1086 }
1087
1088 fn definition(&self) -> FunctionDefinition {
1089 FunctionDefinition {
1090 name: self.name(),
1091 description: self.description(),
1092 parameters: self.schema.clone(),
1093 strict: Some(true),
1094 }
1095 }
1096
1097 async fn call(
1098 &self,
1099 ctx: BaseCtx,
1100 args: Self::Args,
1101 _resources: Vec<Resource>,
1102 ) -> Result<ToolOutput<Self::Output>, BoxError> {
1103 let res = self.memory.get_resource(args._id).await?;
1104 let text = match res.blob {
1105 Some(blob) => match String::from_utf8(blob.0) {
1106 Ok(s) => s,
1107 Err(e) => ByteBufB64(e.into_bytes()).to_string(),
1108 },
1109 None => match res.uri {
1110 Some(uri) => FetchWebResourcesTool::fetch_as_text(&ctx, &uri).await?,
1111 None => Err(format!("Invalid resource {}, no blob or uri", args._id))?,
1112 },
1113 };
1114
1115 Ok(ToolOutput::new(Response::ok(text.into())))
1116 }
1117}
1118
1119#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1121pub struct ListConversationsArgs {
1122 #[serde(default)]
1124 pub cursor: String,
1125 #[serde(default)]
1127 pub limit: usize,
1128}
1129
1130#[derive(Debug, Clone)]
1131pub struct ListConversationsTool {
1132 conversations: Conversations,
1133 description: String,
1134}
1135
1136impl ListConversationsTool {
1137 pub const NAME: &'static str = "list_previous_conversations";
1138
1139 pub fn new(conversations: Conversations) -> Self {
1141 Self { conversations, description: "Lists the current user's previous conversations in reverse chronological order with cursor-based pagination. Returns conversation metadata including status, timestamps, messages, and usage statistics. Use the cursor parameter to paginate through older conversations.".to_string() }
1142 }
1143
1144 pub fn with_description(mut self, description: String) -> Self {
1145 self.description = description;
1146 self
1147 }
1148}
1149
1150impl Tool<BaseCtx> for ListConversationsTool {
1151 type Args = ListConversationsArgs;
1152 type Output = Response;
1153
1154 fn name(&self) -> String {
1155 Self::NAME.to_string()
1156 }
1157
1158 fn description(&self) -> String {
1159 self.description.clone()
1160 }
1161
1162 fn definition(&self) -> FunctionDefinition {
1163 FunctionDefinition {
1164 name: self.name(),
1165 description: self.description.clone(),
1166 parameters: json!({
1167 "type": "object",
1168 "properties": {
1169 "cursor": {
1170 "type": "string",
1171 "description": "The cursor for pagination, returned from the previous call. Use an empty string for the first page."
1172 },
1173 "limit": {
1174 "type": "integer",
1175 "description": "The maximum number of conversations to return, between 1 and 100. Default is 10."
1176 }
1177 },
1178 "required": ["cursor", "limit"],
1179 "additionalProperties": false
1180 }),
1181 strict: Some(true),
1182 }
1183 }
1184
1185 async fn call(
1186 &self,
1187 ctx: BaseCtx,
1188 args: Self::Args,
1189 _resources: Vec<Resource>,
1190 ) -> Result<ToolOutput<Self::Output>, BoxError> {
1191 let (conversations, next_cursor) = self
1192 .conversations
1193 .list_conversations_by_user(
1194 ctx.caller(),
1195 if args.cursor.is_empty() {
1196 None
1197 } else {
1198 Some(args.cursor)
1199 },
1200 if args.limit == 0 {
1201 None
1202 } else {
1203 Some(args.limit)
1204 },
1205 )
1206 .await?;
1207 let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
1208 Ok(ToolOutput::new(Response::Ok {
1209 result: Documents::from(docs).to_string().into(),
1210 next_cursor,
1211 }))
1212 }
1213}
1214
1215#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1217pub struct SearchConversationsArgs {
1218 pub query: String,
1220 #[serde(default)]
1222 pub limit: usize,
1223}
1224
1225#[derive(Debug, Clone)]
1226pub struct SearchConversationsTool {
1227 conversations: Conversations,
1228 description: String,
1229}
1230
1231impl SearchConversationsTool {
1232 pub const NAME: &'static str = "search_conversations";
1233
1234 pub fn new(conversations: Conversations) -> Self {
1236 Self { conversations, description: "Performs a full-text search across the current user's conversation history using a query string. Searches through messages, resources, and artifacts to find relevant past conversations. Use this to recall specific topics, instructions, or context from previous interactions.".to_string() }
1237 }
1238
1239 pub fn with_description(mut self, description: String) -> Self {
1240 self.description = description;
1241 self
1242 }
1243}
1244
1245impl Tool<BaseCtx> for SearchConversationsTool {
1246 type Args = SearchConversationsArgs;
1247 type Output = Response;
1248
1249 fn name(&self) -> String {
1250 Self::NAME.to_string()
1251 }
1252
1253 fn description(&self) -> String {
1254 self.description.clone()
1255 }
1256
1257 fn definition(&self) -> FunctionDefinition {
1258 FunctionDefinition {
1259 name: self.name(),
1260 description: self.description.clone(),
1261 parameters: json!({
1262 "type": "object",
1263 "properties": {
1264 "query": {
1265 "type": "string",
1266 "description": "The query string to search for in the conversation history."
1267 },
1268 "limit": {
1269 "type": "integer",
1270 "description": "The maximum number of conversations to return, between 1 and 100. Default is 10."
1271 }
1272 },
1273 "required": ["query", "limit"],
1274 "additionalProperties": false
1275 }),
1276 strict: Some(true),
1277 }
1278 }
1279
1280 async fn call(
1281 &self,
1282 ctx: BaseCtx,
1283 args: Self::Args,
1284 _resources: Vec<Resource>,
1285 ) -> Result<ToolOutput<Self::Output>, BoxError> {
1286 let conversations = self
1287 .conversations
1288 .search_conversations(
1289 ctx.caller(),
1290 args.query,
1291 if args.limit == 0 {
1292 None
1293 } else {
1294 Some(args.limit)
1295 },
1296 )
1297 .await?;
1298
1299 let docs: Vec<Document> = conversations.into_iter().map(Document::from).collect();
1300 Ok(ToolOutput::new(Response::Ok {
1301 result: Documents::from(docs).to_string().into(),
1302 next_cursor: None,
1303 }))
1304 }
1305}
1306
1307#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
1309#[serde(tag = "type")]
1310pub enum MemoryToolArgs {
1311 GetResource {
1312 _id: u64,
1314 conversation: u64,
1316 },
1317 GetConversation {
1319 _id: u64,
1321 },
1322 GetConversationDelta {
1323 _id: u64,
1325 messages_offset: usize,
1327 artifacts_offset: usize,
1329 },
1330 StopConversation {
1331 _id: u64,
1333 },
1334 SteerConversation {
1335 _id: u64,
1337 message: String,
1339 },
1340 FollowUpConversation {
1341 _id: u64,
1343 message: String,
1345 },
1346 DeleteConversation {
1347 _id: u64,
1349 },
1350 ListPrevConversations {
1352 cursor: Option<String>,
1354 limit: Option<usize>,
1356 },
1357 SearchConversations {
1359 query: String,
1361 limit: Option<usize>,
1363 },
1364}
1365
1366#[derive(Debug, Clone)]
1368pub struct MemoryTool {
1369 memory: Arc<MemoryManagement>,
1370 schema: Json,
1371}
1372
1373impl MemoryTool {
1374 pub const NAME: &'static str = "memory_api";
1375
1376 pub fn new(memory: Arc<MemoryManagement>) -> Self {
1378 let schema = memory_tool_schema();
1379 Self { memory, schema }
1380 }
1381}
1382
1383fn memory_tool_schema() -> Json {
1384 json!({
1385 "type": "object",
1386 "description": "Select one memory API action with type, then provide the fields used by that action. Fields not used by the selected type should be null.",
1387 "properties": {
1388 "type": {
1389 "type": "string",
1390 "enum": [
1391 "GetResource",
1392 "GetConversation",
1393 "GetConversationDelta",
1394 "StopConversation",
1395 "SteerConversation",
1396 "FollowUpConversation",
1397 "DeleteConversation",
1398 "ListPrevConversations",
1399 "SearchConversations"
1400 ],
1401 "description": "Memory API action to perform."
1402 },
1403 "_id": {
1404 "type": ["integer", "null"],
1405 "description": "Resource or conversation ID. Required for actions that target a single resource or conversation."
1406 },
1407 "conversation": {
1408 "type": ["integer", "null"],
1409 "description": "Conversation ID containing the resource. Required for GetResource."
1410 },
1411 "messages_offset": {
1412 "type": ["integer", "null"],
1413 "description": "Messages offset for GetConversationDelta. Use 0 for the first delta read."
1414 },
1415 "artifacts_offset": {
1416 "type": ["integer", "null"],
1417 "description": "Artifacts offset for GetConversationDelta. Use 0 for the first delta read."
1418 },
1419 "message": {
1420 "type": ["string", "null"],
1421 "description": "Message used by SteerConversation or FollowUpConversation."
1422 },
1423 "cursor": {
1424 "type": ["string", "null"],
1425 "description": "Pagination cursor for ListPrevConversations. Use null for the first page."
1426 },
1427 "limit": {
1428 "type": ["integer", "null"],
1429 "description": "Maximum results for listing or searching conversations. Use null for the default."
1430 },
1431 "query": {
1432 "type": ["string", "null"],
1433 "description": "Search query for SearchConversations."
1434 }
1435 },
1436 "required": [
1437 "type",
1438 "_id",
1439 "conversation",
1440 "messages_offset",
1441 "artifacts_offset",
1442 "message",
1443 "cursor",
1444 "limit",
1445 "query"
1446 ],
1447 "additionalProperties": false
1448 })
1449}
1450
1451impl Tool<BaseCtx> for MemoryTool {
1452 type Args = MemoryToolArgs;
1453 type Output = Response;
1454
1455 fn name(&self) -> String {
1456 Self::NAME.to_string()
1457 }
1458
1459 fn description(&self) -> String {
1460 "A unified API for managing conversations and memory. Supports retrieving resources and conversation details, stopping or steering in-progress conversations, sending follow-up messages, deleting conversations, listing previous conversations with pagination, searching conversation history by keyword, and listing KIP command logs.".to_string()
1461 }
1462
1463 fn definition(&self) -> FunctionDefinition {
1464 FunctionDefinition {
1465 name: self.name(),
1466 description: self.description(),
1467 parameters: self.schema.clone(),
1468 strict: Some(true),
1469 }
1470 }
1471
1472 async fn call(
1473 &self,
1474 ctx: BaseCtx,
1475 args: Self::Args,
1476 _resources: Vec<Resource>,
1477 ) -> Result<ToolOutput<Self::Output>, BoxError> {
1478 match args {
1479 MemoryToolArgs::GetResource { _id, conversation } => {
1480 let conversation = self.memory.get_conversation(conversation).await?;
1481 if &conversation.user != ctx.caller() {
1482 return Err("permission denied".into());
1483 }
1484
1485 let mut res = self.memory.get_resource(_id).await?;
1486 if res.blob.is_none()
1487 && let Some(uri) = &res.uri
1488 {
1489 res.blob = FetchWebResourcesTool::fetch_as_bytes(&ctx, uri).await.ok();
1490 }
1491
1492 Ok(ToolOutput::new(Response::Ok {
1493 result: json!(res),
1494 next_cursor: None,
1495 }))
1496 }
1497 MemoryToolArgs::GetConversation { _id } => {
1498 let conversation = self.memory.get_conversation(_id).await?;
1499 if &conversation.user != ctx.caller() {
1500 return Err("permission denied".into());
1501 }
1502
1503 Ok(ToolOutput::new(Response::Ok {
1504 result: json!(conversation),
1505 next_cursor: None,
1506 }))
1507 }
1508 MemoryToolArgs::GetConversationDelta {
1509 _id,
1510 messages_offset,
1511 artifacts_offset,
1512 } => {
1513 let conversation = self.memory.get_conversation(_id).await?;
1514 if &conversation.user != ctx.caller() {
1515 return Err("permission denied".into());
1516 }
1517
1518 Ok(ToolOutput::new(Response::Ok {
1519 result: json!(conversation.into_delta(messages_offset, artifacts_offset)),
1520 next_cursor: None,
1521 }))
1522 }
1523 MemoryToolArgs::StopConversation { _id } => {
1524 let mut conversation = self.memory.get_conversation(_id).await?;
1525 if &conversation.user != ctx.caller() {
1526 return Err("permission denied".into());
1527 }
1528
1529 if conversation.status == ConversationStatus::Working
1530 || conversation.status == ConversationStatus::Submitted
1531 {
1532 conversation.status = ConversationStatus::Cancelled;
1533 conversation.updated_at = unix_ms();
1534 let changes = BTreeMap::from([
1535 (
1536 "status".to_string(),
1537 Fv::Text(conversation.status.to_string()),
1538 ),
1539 ("updated_at".to_string(), Fv::U64(conversation.updated_at)),
1540 ]);
1541 self.memory.update_conversation(_id, changes).await?;
1542 }
1543
1544 Ok(ToolOutput::new(Response::Ok {
1545 result: json!(conversation),
1546 next_cursor: None,
1547 }))
1548 }
1549 MemoryToolArgs::SteerConversation { _id, message } => {
1550 if message.trim().is_empty() {
1551 return Err("steering message cannot be empty".into());
1552 }
1553
1554 let mut conversation = self.memory.get_conversation(_id).await?;
1555 if &conversation.user != ctx.caller() {
1556 return Err("permission denied".into());
1557 }
1558
1559 let steering_messages = if let Some(msg) = conversation.steering_messages.clone() {
1560 let mut msgs = msg;
1561 msgs.push(message.clone());
1562 msgs
1563 } else {
1564 vec![message.clone()]
1565 };
1566 conversation.steering_messages = Some(steering_messages.clone());
1567 conversation.updated_at = unix_ms();
1568 let changes = BTreeMap::from([
1569 ("steering_messages".to_string(), steering_messages.into()),
1570 ("updated_at".to_string(), Fv::U64(conversation.updated_at)),
1571 ]);
1572 self.memory.update_conversation(_id, changes).await?;
1573
1574 Ok(ToolOutput::new(Response::Ok {
1575 result: json!(conversation),
1576 next_cursor: None,
1577 }))
1578 }
1579 MemoryToolArgs::FollowUpConversation { _id, message } => {
1580 if message.trim().is_empty() {
1581 return Err("follow-up message cannot be empty".into());
1582 }
1583
1584 let mut conversation = self.memory.get_conversation(_id).await?;
1585 if &conversation.user != ctx.caller() {
1586 return Err("permission denied".into());
1587 }
1588
1589 let follow_up_messages = if let Some(msg) = conversation.follow_up_messages.clone()
1590 {
1591 let mut msgs = msg;
1592 msgs.push(message.clone());
1593 msgs
1594 } else {
1595 vec![message.clone()]
1596 };
1597 conversation.follow_up_messages = Some(follow_up_messages.clone());
1598 conversation.updated_at = unix_ms();
1599 let changes = BTreeMap::from([
1600 ("follow_up_messages".to_string(), follow_up_messages.into()),
1601 ("updated_at".to_string(), Fv::U64(conversation.updated_at)),
1602 ]);
1603 self.memory.update_conversation(_id, changes).await?;
1604
1605 Ok(ToolOutput::new(Response::Ok {
1606 result: json!(conversation),
1607 next_cursor: None,
1608 }))
1609 }
1610 MemoryToolArgs::DeleteConversation { _id } => {
1611 let conversation = self.memory.get_conversation(_id).await?;
1612 if &conversation.user != ctx.caller() {
1613 return Err("permission denied".into());
1614 }
1615
1616 let deleted = self.memory.delete_conversation(_id).await?;
1617 Ok(ToolOutput::new(Response::Ok {
1618 result: json!({ "deleted": deleted }),
1619 next_cursor: None,
1620 }))
1621 }
1622 MemoryToolArgs::ListPrevConversations { cursor, limit } => {
1623 let (conversations, next_cursor) = self
1624 .memory
1625 .list_conversations_by_user(ctx.caller(), cursor, limit)
1626 .await?;
1627
1628 Ok(ToolOutput::new(Response::Ok {
1629 result: json!(conversations),
1630 next_cursor,
1631 }))
1632 }
1633 MemoryToolArgs::SearchConversations { query, limit } => {
1634 let conversations = self
1635 .memory
1636 .search_conversations(ctx.caller(), query, limit)
1637 .await?;
1638
1639 Ok(ToolOutput::new(Response::Ok {
1640 result: json!(conversations),
1641 next_cursor: None,
1642 }))
1643 }
1644 }
1645 }
1646}
1647
1648#[cfg(test)]
1649mod tests {
1650 use super::*;
1651
1652 #[test]
1653 fn test_conversation_status() {
1654 let chat = Conversation {
1655 status: ConversationStatus::Completed,
1656 ..Default::default()
1657 };
1658 let rt = ConversationStatus::Completed;
1659 println!("{}", rt);
1660
1661 let rt = serde_json::to_string(&chat).unwrap();
1662 assert!(rt.contains(r#","status":"completed","#));
1663 let chat2: Conversation = serde_json::from_str(&rt).unwrap();
1664 assert_eq!(chat.status, chat2.status);
1665
1666 let args = MemoryToolArgs::GetConversation { _id: 1 };
1667 let rt = serde_json::to_string(&args).unwrap();
1668 assert_eq!(rt, r#"{"type":"GetConversation","_id":1}"#);
1669 let args1: MemoryToolArgs = serde_json::from_str(&rt).unwrap();
1670 assert_eq!(args, args1);
1671
1672 let strict_args: MemoryToolArgs = serde_json::from_value(json!({
1673 "type": "GetConversation",
1674 "_id": 1,
1675 "conversation": null,
1676 "messages_offset": null,
1677 "artifacts_offset": null,
1678 "message": null,
1679 "cursor": null,
1680 "limit": null,
1681 "query": null
1682 }))
1683 .unwrap();
1684 assert_eq!(strict_args, args);
1685
1686 let schema = memory_tool_schema();
1687 let required = schema["required"].as_array().unwrap();
1688 let properties = schema["properties"].as_object().unwrap();
1689 assert_eq!(required.len(), properties.len());
1690 for key in properties.keys() {
1691 assert!(required.iter().any(|item| item.as_str() == Some(key)));
1692 }
1693 }
1694}