celerix_store/sdk/
client.rs

1use std::collections::HashMap;
2use async_trait::async_trait;
3use tokio::net::TcpStream;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use crate::{Result, Error, KVReader, KVWriter, AppEnumeration, BatchExporter, GlobalSearcher, Orchestrator, CelerixStore, AppScope, VaultScope};
6use crate::engine::vault;
7use tokio::sync::Mutex;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10
11/// A remote client for the Celerix Store.
12/// 
13/// `Client` implements the [`CelerixStore`] trait and communicates with a 
14/// `celerix-stored` daemon over TCP. It features automatic reconnection and 
15/// exponential backoff retries.
16pub struct Client {
17    #[allow(dead_code)]
18    addr: String,
19    inner: Mutex<Option<ClientInner>>,
20}
21
22struct ClientInner {
23    reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
24    writer: tokio::net::tcp::OwnedWriteHalf,
25}
26
27impl Client {
28    /// Connects to a remote Celerix Store daemon at the specified address.
29    pub async fn connect(addr: &str) -> Result<Self> {
30        let inner = Client::connect_inner(addr).await?;
31        Ok(Self {
32            addr: addr.to_string(),
33            inner: Mutex::new(Some(inner)),
34        })
35    }
36
37    async fn send_and_receive(&self, cmd: String) -> Result<String> {
38        let mut inner_guard = self.inner.lock().await;
39        
40        // Retry logic
41        for i in 0..3 {
42            if inner_guard.is_none() {
43                match Client::connect_inner(&self.addr).await {
44                    Ok(inner) => *inner_guard = Some(inner),
45                    Err(e) => {
46                        if i == 2 { return Err(e); }
47                        tokio::time::sleep(std::time::Duration::from_millis((i + 1) * 200)).await;
48                        continue;
49                    }
50                }
51            }
52
53            let inner = inner_guard.as_mut().unwrap();
54            if let Err(_) = inner.writer.write_all(format!("{}\n", cmd).as_bytes()).await {
55                 *inner_guard = None;
56                 continue;
57            }
58
59            let mut resp = String::new();
60            match inner.reader.read_line(&mut resp).await {
61                Ok(0) => {
62                    *inner_guard = None;
63                    continue;
64                }
65                Ok(_) => {
66                    let resp = resp.trim();
67                    if resp.starts_with("ERR") {
68                        return Err(Error::Internal(resp[4..].to_string()));
69                    }
70                    return Ok(resp.to_string());
71                }
72                Err(_) => {
73                    *inner_guard = None;
74                    continue;
75                }
76            }
77        }
78        
79        Err(Error::Internal("failed after 3 attempts".to_string()))
80    }
81
82    async fn connect_inner(addr: &str) -> Result<ClientInner> {
83        let stream = TcpStream::connect(addr).await?;
84        let (reader, writer) = stream.into_split();
85        Ok(ClientInner {
86            reader: BufReader::new(reader),
87            writer,
88        })
89    }
90
91    /// Retrieves a type-safe value using generics.
92    /// 
93    /// Automatically handles JSON deserialization into the target type.
94    pub async fn get_generic<T: DeserializeOwned>(&self, persona_id: &str, app_id: &str, key: &str) -> Result<T> {
95        let val = self.get(persona_id, app_id, key).await?;
96        Ok(serde_json::from_value(val)?)
97    }
98
99    /// Stores a type-safe value using generics.
100    /// 
101    /// Automatically handles JSON serialization of the value.
102    pub async fn set_generic<T: Serialize>(&self, persona_id: &str, app_id: &str, key: &str, value: T) -> Result<()> {
103        let val = serde_json::to_value(value)?;
104        self.set(persona_id, app_id, key, val).await
105    }
106}
107
108#[async_trait]
109impl KVReader for Client {
110    async fn get(&self, persona_id: &str, app_id: &str, key: &str) -> Result<serde_json::Value> {
111        let resp = self.send_and_receive(format!("GET {} {} {}", persona_id, app_id, key)).await?;
112        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
113        Ok(serde_json::from_str(json_data)?)
114    }
115}
116
117#[async_trait]
118impl KVWriter for Client {
119    async fn set(&self, persona_id: &str, app_id: &str, key: &str, value: serde_json::Value) -> Result<()> {
120        let val_str = serde_json::to_string(&value)?;
121        self.send_and_receive(format!("SET {} {} {} {}", persona_id, app_id, key, val_str)).await?;
122        Ok(())
123    }
124
125    async fn delete(&self, persona_id: &str, app_id: &str, key: &str) -> Result<()> {
126        self.send_and_receive(format!("DEL {} {} {}", persona_id, app_id, key)).await?;
127        Ok(())
128    }
129}
130
131#[async_trait]
132impl AppEnumeration for Client {
133    async fn get_personas(&self) -> Result<Vec<String>> {
134        let resp = self.send_and_receive("LIST_PERSONAS".to_string()).await?;
135        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
136        Ok(serde_json::from_str(json_data)?)
137    }
138
139    async fn get_apps(&self, persona_id: &str) -> Result<Vec<String>> {
140        let resp = self.send_and_receive(format!("LIST_APPS {}", persona_id)).await?;
141        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
142        Ok(serde_json::from_str(json_data)?)
143    }
144}
145
146#[async_trait]
147impl BatchExporter for Client {
148    async fn get_app_store(&self, persona_id: &str, app_id: &str) -> Result<HashMap<String, serde_json::Value>> {
149        let resp = self.send_and_receive(format!("DUMP {} {}", persona_id, app_id)).await?;
150        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
151        Ok(serde_json::from_str(json_data)?)
152    }
153
154    async fn dump_app(&self, app_id: &str) -> Result<HashMap<String, HashMap<String, serde_json::Value>>> {
155        let resp = self.send_and_receive(format!("DUMP_APP {}", app_id)).await?;
156        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
157        Ok(serde_json::from_str(json_data)?)
158    }
159}
160
161#[async_trait]
162impl GlobalSearcher for Client {
163    async fn get_global(&self, app_id: &str, key: &str) -> Result<(serde_json::Value, String)> {
164        let resp = self.send_and_receive(format!("GET_GLOBAL {} {}", app_id, key)).await?;
165        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
166        let out: serde_json::Value = serde_json::from_str(json_data)?;
167        let persona = out["persona"].as_str().ok_or_else(|| Error::Internal("Missing persona".to_string()))?.to_string();
168        let value = out["value"].clone();
169        Ok((value, persona))
170    }
171}
172
173#[async_trait]
174impl Orchestrator for Client {
175    async fn move_key(&self, src_persona: &str, dst_persona: &str, app_id: &str, key: &str) -> Result<()> {
176        self.send_and_receive(format!("MOVE {} {} {} {}", src_persona, dst_persona, app_id, key)).await?;
177        Ok(())
178    }
179}
180
181impl CelerixStore for Client {
182    fn app(&self, persona_id: &str, app_id: &str) -> Box<dyn AppScope + '_> {
183        Box::new(RemoteAppScope {
184            client: self,
185            persona_id: persona_id.to_string(),
186            app_id: app_id.to_string(),
187        })
188    }
189}
190
191pub struct RemoteAppScope<'a> {
192    client: &'a Client,
193    persona_id: String,
194    app_id: String,
195}
196
197#[async_trait]
198impl<'a> AppScope for RemoteAppScope<'a> {
199    async fn get(&self, key: &str) -> Result<serde_json::Value> {
200        self.client.get(&self.persona_id, &self.app_id, key).await
201    }
202
203    async fn set(&self, key: &str, value: serde_json::Value) -> Result<()> {
204        self.client.set(&self.persona_id, &self.app_id, key, value).await
205    }
206
207    async fn delete(&self, key: &str) -> Result<()> {
208        self.client.delete(&self.persona_id, &self.app_id, key).await
209    }
210
211    fn vault(&self, master_key: &[u8]) -> Box<dyn VaultScope + '_> {
212        Box::new(RemoteVaultScope {
213            app: self,
214            master_key: master_key.to_vec(),
215        })
216    }
217}
218
219pub struct RemoteVaultScope<'a> {
220    app: &'a RemoteAppScope<'a>,
221    master_key: Vec<u8>,
222}
223
224#[async_trait]
225impl<'a> VaultScope for RemoteVaultScope<'a> {
226    async fn get(&self, key: &str) -> Result<String> {
227        let val = self.app.get(key).await?;
228        let cipher_hex = val.as_str().ok_or_else(|| Error::Internal("Vault data is not a string".to_string()))?;
229        vault::decrypt(cipher_hex, &self.master_key)
230    }
231
232    async fn set(&self, key: &str, plaintext: &str) -> Result<()> {
233        let cipher_hex = vault::encrypt(plaintext, &self.master_key)?;
234        self.app.set(key, serde_json::Value::String(cipher_hex)).await
235    }
236}