Skip to main content

synaptic_lark/store/
checkpointer.rs

1use crate::{api::bitable::BitableApi, LarkConfig};
2
3/// Bitable-backed graph checkpoint store.
4///
5/// Persists [`synaptic_graph::checkpoint::Checkpoint`] snapshots into a Feishu Bitable table,
6/// enabling human-in-the-loop workflows via the Feishu UI.
7///
8/// The Bitable table must contain the following fields:
9/// - `thread_id` (Text)
10/// - `checkpoint_id` (Text)
11/// - `parent_id` (Text)
12/// - `state` (Text — JSON)
13/// - `next_node` (Text)
14/// - `metadata` (Text — JSON)
15/// - `created_at` (Text — Unix timestamp string)
16///
17/// This struct is always compiled. The [`synaptic_graph::checkpoint::Checkpointer`] impl is
18/// gated behind `#[cfg(feature = "checkpointer")]`.
19#[allow(dead_code)]
20pub struct LarkBitableCheckpointer {
21    api: BitableApi,
22    app_token: String,
23    table_id: String,
24}
25
26impl LarkBitableCheckpointer {
27    /// Create a new checkpointer.
28    ///
29    /// * `config`     — Lark application credentials and base URL.
30    /// * `app_token`  — Bitable app token (e.g. `"bascnXxx"`).
31    /// * `table_id`   — Table ID inside that Bitable (e.g. `"tblXxx"`).
32    pub fn new(
33        config: LarkConfig,
34        app_token: impl Into<String>,
35        table_id: impl Into<String>,
36    ) -> Self {
37        Self {
38            api: BitableApi::new(config),
39            app_token: app_token.into(),
40            table_id: table_id.into(),
41        }
42    }
43
44    /// Return the Bitable app token this checkpointer targets.
45    pub fn app_token(&self) -> &str {
46        &self.app_token
47    }
48}
49
50#[cfg(feature = "checkpointer")]
51mod checkpointer_impl {
52    use super::*;
53    use async_trait::async_trait;
54    use serde_json::json;
55    use synaptic_core::SynapticError;
56    use synaptic_graph::{Checkpoint, CheckpointConfig, Checkpointer};
57
58    #[async_trait]
59    impl Checkpointer for LarkBitableCheckpointer {
60        async fn put(
61            &self,
62            config: &CheckpointConfig,
63            checkpoint: &Checkpoint,
64        ) -> Result<(), SynapticError> {
65            let state_str = serde_json::to_string(&checkpoint.state)
66                .map_err(|e| SynapticError::Graph(format!("serialize state: {e}")))?;
67            let meta_str = serde_json::to_string(&checkpoint.metadata)
68                .map_err(|e| SynapticError::Graph(format!("serialize metadata: {e}")))?;
69            let records = vec![json!({
70                "fields": {
71                    "thread_id": &config.thread_id,
72                    "checkpoint_id": &checkpoint.id,
73                    "parent_id": checkpoint.parent_id.as_deref().unwrap_or(""),
74                    "state": state_str,
75                    "next_node": checkpoint.next_node.as_deref().unwrap_or(""),
76                    "metadata": meta_str,
77                    "created_at": now_ts(),
78                }
79            })];
80            self.api
81                .batch_create_records(&self.app_token, &self.table_id, records)
82                .await
83                .map_err(|e| SynapticError::Graph(e.to_string()))?;
84            Ok(())
85        }
86
87        async fn get(
88            &self,
89            config: &CheckpointConfig,
90        ) -> Result<Option<Checkpoint>, SynapticError> {
91            let body = json!({
92                "page_size": 1,
93                "filter": {
94                    "conjunction": "and",
95                    "conditions": [{
96                        "field_name": "thread_id",
97                        "operator": "is",
98                        "value": [&config.thread_id]
99                    }]
100                },
101                "sort": [{ "field_name": "created_at", "desc": true }]
102            });
103            let items = self
104                .api
105                .search_records(&self.app_token, &self.table_id, body)
106                .await
107                .map_err(|e| SynapticError::Graph(e.to_string()))?;
108            match items.into_iter().next() {
109                None => Ok(None),
110                Some(item) => Ok(Some(record_to_checkpoint(&item)?)),
111            }
112        }
113
114        async fn list(&self, config: &CheckpointConfig) -> Result<Vec<Checkpoint>, SynapticError> {
115            let body = json!({
116                "page_size": 100,
117                "filter": {
118                    "conjunction": "and",
119                    "conditions": [{
120                        "field_name": "thread_id",
121                        "operator": "is",
122                        "value": [&config.thread_id]
123                    }]
124                },
125                "sort": [{ "field_name": "created_at", "desc": false }]
126            });
127            let items = self
128                .api
129                .search_records(&self.app_token, &self.table_id, body)
130                .await
131                .map_err(|e| SynapticError::Graph(e.to_string()))?;
132            items.iter().map(record_to_checkpoint).collect()
133        }
134    }
135
136    fn now_ts() -> String {
137        std::time::SystemTime::now()
138            .duration_since(std::time::UNIX_EPOCH)
139            .unwrap_or_default()
140            .as_secs()
141            .to_string()
142    }
143
144    fn record_to_checkpoint(item: &serde_json::Value) -> Result<Checkpoint, SynapticError> {
145        let f = &item["fields"];
146        let state: serde_json::Value = serde_json::from_str(f["state"].as_str().unwrap_or("{}"))
147            .map_err(|e| SynapticError::Graph(format!("deserialize state: {e}")))?;
148        let metadata: std::collections::HashMap<String, serde_json::Value> =
149            serde_json::from_str(f["metadata"].as_str().unwrap_or("{}")).unwrap_or_default();
150        let next_node = f["next_node"]
151            .as_str()
152            .filter(|s| !s.is_empty())
153            .map(String::from);
154        let parent_id = f["parent_id"]
155            .as_str()
156            .filter(|s| !s.is_empty())
157            .map(String::from);
158        let id = f["checkpoint_id"].as_str().unwrap_or("").to_string();
159        Ok(Checkpoint {
160            id,
161            state,
162            next_node,
163            parent_id,
164            metadata,
165        })
166    }
167}