synaptic_lark/store/
checkpointer.rs1use crate::{api::bitable::BitableApi, LarkConfig};
2
3#[allow(dead_code)]
20pub struct LarkBitableCheckpointer {
21 api: BitableApi,
22 app_token: String,
23 table_id: String,
24}
25
26impl LarkBitableCheckpointer {
27 pub fn new(
33 config: LarkConfig,
34 app_token: impl Into<String>,
35 table_id: impl Into<String>,
36 ) -> Self {
37 Self {
38 api: BitableApi::new(config),
39 app_token: app_token.into(),
40 table_id: table_id.into(),
41 }
42 }
43
44 pub fn app_token(&self) -> &str {
46 &self.app_token
47 }
48}
49
50#[cfg(feature = "checkpointer")]
51mod checkpointer_impl {
52 use super::*;
53 use async_trait::async_trait;
54 use serde_json::json;
55 use synaptic_core::SynapticError;
56 use synaptic_graph::{Checkpoint, CheckpointConfig, Checkpointer};
57
58 #[async_trait]
59 impl Checkpointer for LarkBitableCheckpointer {
60 async fn put(
61 &self,
62 config: &CheckpointConfig,
63 checkpoint: &Checkpoint,
64 ) -> Result<(), SynapticError> {
65 let state_str = serde_json::to_string(&checkpoint.state)
66 .map_err(|e| SynapticError::Graph(format!("serialize state: {e}")))?;
67 let meta_str = serde_json::to_string(&checkpoint.metadata)
68 .map_err(|e| SynapticError::Graph(format!("serialize metadata: {e}")))?;
69 let records = vec![json!({
70 "fields": {
71 "thread_id": &config.thread_id,
72 "checkpoint_id": &checkpoint.id,
73 "parent_id": checkpoint.parent_id.as_deref().unwrap_or(""),
74 "state": state_str,
75 "next_node": checkpoint.next_node.as_deref().unwrap_or(""),
76 "metadata": meta_str,
77 "created_at": now_ts(),
78 }
79 })];
80 self.api
81 .batch_create_records(&self.app_token, &self.table_id, records)
82 .await
83 .map_err(|e| SynapticError::Graph(e.to_string()))?;
84 Ok(())
85 }
86
87 async fn get(
88 &self,
89 config: &CheckpointConfig,
90 ) -> Result<Option<Checkpoint>, SynapticError> {
91 let body = json!({
92 "page_size": 1,
93 "filter": {
94 "conjunction": "and",
95 "conditions": [{
96 "field_name": "thread_id",
97 "operator": "is",
98 "value": [&config.thread_id]
99 }]
100 },
101 "sort": [{ "field_name": "created_at", "desc": true }]
102 });
103 let items = self
104 .api
105 .search_records(&self.app_token, &self.table_id, body)
106 .await
107 .map_err(|e| SynapticError::Graph(e.to_string()))?;
108 match items.into_iter().next() {
109 None => Ok(None),
110 Some(item) => Ok(Some(record_to_checkpoint(&item)?)),
111 }
112 }
113
114 async fn list(&self, config: &CheckpointConfig) -> Result<Vec<Checkpoint>, SynapticError> {
115 let body = json!({
116 "page_size": 100,
117 "filter": {
118 "conjunction": "and",
119 "conditions": [{
120 "field_name": "thread_id",
121 "operator": "is",
122 "value": [&config.thread_id]
123 }]
124 },
125 "sort": [{ "field_name": "created_at", "desc": false }]
126 });
127 let items = self
128 .api
129 .search_records(&self.app_token, &self.table_id, body)
130 .await
131 .map_err(|e| SynapticError::Graph(e.to_string()))?;
132 items.iter().map(record_to_checkpoint).collect()
133 }
134 }
135
136 fn now_ts() -> String {
137 std::time::SystemTime::now()
138 .duration_since(std::time::UNIX_EPOCH)
139 .unwrap_or_default()
140 .as_secs()
141 .to_string()
142 }
143
144 fn record_to_checkpoint(item: &serde_json::Value) -> Result<Checkpoint, SynapticError> {
145 let f = &item["fields"];
146 let state: serde_json::Value = serde_json::from_str(f["state"].as_str().unwrap_or("{}"))
147 .map_err(|e| SynapticError::Graph(format!("deserialize state: {e}")))?;
148 let metadata: std::collections::HashMap<String, serde_json::Value> =
149 serde_json::from_str(f["metadata"].as_str().unwrap_or("{}")).unwrap_or_default();
150 let next_node = f["next_node"]
151 .as_str()
152 .filter(|s| !s.is_empty())
153 .map(String::from);
154 let parent_id = f["parent_id"]
155 .as_str()
156 .filter(|s| !s.is_empty())
157 .map(String::from);
158 let id = f["checkpoint_id"].as_str().unwrap_or("").to_string();
159 Ok(Checkpoint {
160 id,
161 state,
162 next_node,
163 parent_id,
164 metadata,
165 })
166 }
167}