Skip to main content

synaptic_lark/store/
memory.rs

1use async_trait::async_trait;
2use serde_json::json;
3use synaptic_core::{MemoryStore, Message, SynapticError};
4
5use crate::{api::bitable::BitableApi, LarkConfig};
6
7pub struct LarkBitableMemoryStore {
8    api: BitableApi,
9    app_token: String,
10    table_id: String,
11}
12
13impl LarkBitableMemoryStore {
14    pub fn new(
15        config: LarkConfig,
16        app_token: impl Into<String>,
17        table_id: impl Into<String>,
18    ) -> Self {
19        Self {
20            api: BitableApi::new(config),
21            app_token: app_token.into(),
22            table_id: table_id.into(),
23        }
24    }
25
26    pub fn app_token(&self) -> &str {
27        &self.app_token
28    }
29
30    pub fn table_id(&self) -> &str {
31        &self.table_id
32    }
33}
34
35#[async_trait]
36impl MemoryStore for LarkBitableMemoryStore {
37    async fn append(&self, session_id: &str, message: Message) -> Result<(), SynapticError> {
38        let role = message.role().to_string();
39        let content = message.content().to_string();
40        let tc_slice = message.tool_calls();
41        let tool_calls = if tc_slice.is_empty() {
42            String::new()
43        } else {
44            serde_json::to_string(tc_slice).unwrap_or_default()
45        };
46        let tool_call_id = message.tool_call_id().unwrap_or("").to_string();
47        let seq = std::time::SystemTime::now()
48            .duration_since(std::time::UNIX_EPOCH)
49            .unwrap_or_default()
50            .as_millis()
51            .to_string();
52
53        let records = vec![json!({
54            "fields": {
55                "session_id": session_id,
56                "role": role,
57                "content": content,
58                "tool_calls": tool_calls,
59                "tool_call_id": tool_call_id,
60                "seq": seq,
61            }
62        })];
63        self.api
64            .batch_create_records(&self.app_token, &self.table_id, records)
65            .await
66            .map_err(|e| SynapticError::Memory(e.to_string()))?;
67        Ok(())
68    }
69
70    async fn load(&self, session_id: &str) -> Result<Vec<Message>, SynapticError> {
71        let body = json!({
72            "page_size": 500,
73            "filter": {
74                "conjunction": "and",
75                "conditions": [{
76                    "field_name": "session_id",
77                    "operator": "is",
78                    "value": [session_id]
79                }]
80            },
81            "sort": [{ "field_name": "seq", "desc": false }]
82        });
83        let items = self
84            .api
85            .search_records(&self.app_token, &self.table_id, body)
86            .await
87            .map_err(|e| SynapticError::Memory(e.to_string()))?;
88
89        let mut messages = Vec::new();
90        for item in &items {
91            let f = &item["fields"];
92            let role = f["role"].as_str().unwrap_or("human");
93            let content = f["content"].as_str().unwrap_or("").to_string();
94            let msg = match role {
95                "system" => Message::system(content),
96                "ai" | "assistant" => {
97                    let tc_str = f["tool_calls"].as_str().unwrap_or("");
98                    if tc_str.is_empty() {
99                        Message::ai(content)
100                    } else {
101                        match serde_json::from_str(tc_str) {
102                            Ok(tcs) => Message::ai_with_tool_calls(content, tcs),
103                            Err(_) => Message::ai(content),
104                        }
105                    }
106                }
107                "tool" => {
108                    let id = f["tool_call_id"].as_str().unwrap_or("").to_string();
109                    Message::tool(id, content)
110                }
111                _ => Message::human(content),
112            };
113            messages.push(msg);
114        }
115        Ok(messages)
116    }
117
118    async fn clear(&self, session_id: &str) -> Result<(), SynapticError> {
119        let search_body = json!({
120            "page_size": 500,
121            "filter": {
122                "conjunction": "and",
123                "conditions": [{
124                    "field_name": "session_id",
125                    "operator": "is",
126                    "value": [session_id]
127                }]
128            }
129        });
130        let items = self
131            .api
132            .search_records(&self.app_token, &self.table_id, search_body)
133            .await
134            .map_err(|e| SynapticError::Memory(e.to_string()))?;
135
136        let ids: Vec<String> = items
137            .iter()
138            .filter_map(|r| r["record_id"].as_str().map(String::from))
139            .collect();
140
141        if ids.is_empty() {
142            return Ok(());
143        }
144
145        self.api
146            .batch_delete_records(&self.app_token, &self.table_id, ids)
147            .await
148            .map_err(|e| SynapticError::Memory(e.to_string()))
149    }
150}