celerix_store/sdk/
client.rs

1use std::collections::HashMap;
2use async_trait::async_trait;
3use tokio::net::TcpStream;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use crate::{Result, Error, KVReader, KVWriter, AppEnumeration, BatchExporter, GlobalSearcher, Orchestrator, CelerixStore, AppScope, VaultScope};
6use crate::engine::vault;
7use tokio::sync::Mutex;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10
11pub struct Client {
12    #[allow(dead_code)]
13    addr: String,
14    inner: Mutex<Option<ClientInner>>,
15}
16
17struct ClientInner {
18    reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
19    writer: tokio::net::tcp::OwnedWriteHalf,
20}
21
22impl Client {
23    pub async fn connect(addr: &str) -> Result<Self> {
24        let inner = Client::connect_inner(addr).await?;
25        Ok(Self {
26            addr: addr.to_string(),
27            inner: Mutex::new(Some(inner)),
28        })
29    }
30
31    async fn send_and_receive(&self, cmd: String) -> Result<String> {
32        let mut inner_guard = self.inner.lock().await;
33        
34        // Retry logic
35        for i in 0..3 {
36            if inner_guard.is_none() {
37                match Client::connect_inner(&self.addr).await {
38                    Ok(inner) => *inner_guard = Some(inner),
39                    Err(e) => {
40                        if i == 2 { return Err(e); }
41                        tokio::time::sleep(std::time::Duration::from_millis((i + 1) * 200)).await;
42                        continue;
43                    }
44                }
45            }
46
47            let inner = inner_guard.as_mut().unwrap();
48            if let Err(_) = inner.writer.write_all(format!("{}\n", cmd).as_bytes()).await {
49                 *inner_guard = None;
50                 continue;
51            }
52
53            let mut resp = String::new();
54            match inner.reader.read_line(&mut resp).await {
55                Ok(0) => {
56                    *inner_guard = None;
57                    continue;
58                }
59                Ok(_) => {
60                    let resp = resp.trim();
61                    if resp.starts_with("ERR") {
62                        return Err(Error::Internal(resp[4..].to_string()));
63                    }
64                    return Ok(resp.to_string());
65                }
66                Err(_) => {
67                    *inner_guard = None;
68                    continue;
69                }
70            }
71        }
72        
73        Err(Error::Internal("failed after 3 attempts".to_string()))
74    }
75
76    async fn connect_inner(addr: &str) -> Result<ClientInner> {
77        let stream = TcpStream::connect(addr).await?;
78        let (reader, writer) = stream.into_split();
79        Ok(ClientInner {
80            reader: BufReader::new(reader),
81            writer,
82        })
83    }
84
85    pub async fn get_generic<T: DeserializeOwned>(&self, persona_id: &str, app_id: &str, key: &str) -> Result<T> {
86        let val = self.get(persona_id, app_id, key).await?;
87        Ok(serde_json::from_value(val)?)
88    }
89
90    pub async fn set_generic<T: Serialize>(&self, persona_id: &str, app_id: &str, key: &str, value: T) -> Result<()> {
91        let val = serde_json::to_value(value)?;
92        self.set(persona_id, app_id, key, val).await
93    }
94}
95
96#[async_trait]
97impl KVReader for Client {
98    async fn get(&self, persona_id: &str, app_id: &str, key: &str) -> Result<serde_json::Value> {
99        let resp = self.send_and_receive(format!("GET {} {} {}", persona_id, app_id, key)).await?;
100        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
101        Ok(serde_json::from_str(json_data)?)
102    }
103}
104
105#[async_trait]
106impl KVWriter for Client {
107    async fn set(&self, persona_id: &str, app_id: &str, key: &str, value: serde_json::Value) -> Result<()> {
108        let val_str = serde_json::to_string(&value)?;
109        self.send_and_receive(format!("SET {} {} {} {}", persona_id, app_id, key, val_str)).await?;
110        Ok(())
111    }
112
113    async fn delete(&self, persona_id: &str, app_id: &str, key: &str) -> Result<()> {
114        self.send_and_receive(format!("DEL {} {} {}", persona_id, app_id, key)).await?;
115        Ok(())
116    }
117}
118
119#[async_trait]
120impl AppEnumeration for Client {
121    async fn get_personas(&self) -> Result<Vec<String>> {
122        let resp = self.send_and_receive("LIST_PERSONAS".to_string()).await?;
123        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
124        Ok(serde_json::from_str(json_data)?)
125    }
126
127    async fn get_apps(&self, persona_id: &str) -> Result<Vec<String>> {
128        let resp = self.send_and_receive(format!("LIST_APPS {}", persona_id)).await?;
129        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
130        Ok(serde_json::from_str(json_data)?)
131    }
132}
133
134#[async_trait]
135impl BatchExporter for Client {
136    async fn get_app_store(&self, persona_id: &str, app_id: &str) -> Result<HashMap<String, serde_json::Value>> {
137        let resp = self.send_and_receive(format!("DUMP {} {}", persona_id, app_id)).await?;
138        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
139        Ok(serde_json::from_str(json_data)?)
140    }
141
142    async fn dump_app(&self, app_id: &str) -> Result<HashMap<String, HashMap<String, serde_json::Value>>> {
143        let resp = self.send_and_receive(format!("DUMP_APP {}", app_id)).await?;
144        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
145        Ok(serde_json::from_str(json_data)?)
146    }
147}
148
149#[async_trait]
150impl GlobalSearcher for Client {
151    async fn get_global(&self, app_id: &str, key: &str) -> Result<(serde_json::Value, String)> {
152        let resp = self.send_and_receive(format!("GET_GLOBAL {} {}", app_id, key)).await?;
153        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
154        let out: serde_json::Value = serde_json::from_str(json_data)?;
155        let persona = out["persona"].as_str().ok_or_else(|| Error::Internal("Missing persona".to_string()))?.to_string();
156        let value = out["value"].clone();
157        Ok((value, persona))
158    }
159}
160
161#[async_trait]
162impl Orchestrator for Client {
163    async fn move_key(&self, src_persona: &str, dst_persona: &str, app_id: &str, key: &str) -> Result<()> {
164        self.send_and_receive(format!("MOVE {} {} {} {}", src_persona, dst_persona, app_id, key)).await?;
165        Ok(())
166    }
167}
168
169impl CelerixStore for Client {
170    fn app(&self, persona_id: &str, app_id: &str) -> Box<dyn AppScope + '_> {
171        Box::new(RemoteAppScope {
172            client: self,
173            persona_id: persona_id.to_string(),
174            app_id: app_id.to_string(),
175        })
176    }
177}
178
179pub struct RemoteAppScope<'a> {
180    client: &'a Client,
181    persona_id: String,
182    app_id: String,
183}
184
185#[async_trait]
186impl<'a> AppScope for RemoteAppScope<'a> {
187    async fn get(&self, key: &str) -> Result<serde_json::Value> {
188        self.client.get(&self.persona_id, &self.app_id, key).await
189    }
190
191    async fn set(&self, key: &str, value: serde_json::Value) -> Result<()> {
192        self.client.set(&self.persona_id, &self.app_id, key, value).await
193    }
194
195    async fn delete(&self, key: &str) -> Result<()> {
196        self.client.delete(&self.persona_id, &self.app_id, key).await
197    }
198
199    fn vault(&self, master_key: &[u8]) -> Box<dyn VaultScope + '_> {
200        Box::new(RemoteVaultScope {
201            app: self,
202            master_key: master_key.to_vec(),
203        })
204    }
205}
206
207pub struct RemoteVaultScope<'a> {
208    app: &'a RemoteAppScope<'a>,
209    master_key: Vec<u8>,
210}
211
212#[async_trait]
213impl<'a> VaultScope for RemoteVaultScope<'a> {
214    async fn get(&self, key: &str) -> Result<String> {
215        let val = self.app.get(key).await?;
216        let cipher_hex = val.as_str().ok_or_else(|| Error::Internal("Vault data is not a string".to_string()))?;
217        vault::decrypt(cipher_hex, &self.master_key)
218    }
219
220    async fn set(&self, key: &str, plaintext: &str) -> Result<()> {
221        let cipher_hex = vault::encrypt(plaintext, &self.master_key)?;
222        self.app.set(key, serde_json::Value::String(cipher_hex)).await
223    }
224}