1use std::collections::HashMap;
2use async_trait::async_trait;
3use tokio::net::TcpStream;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use crate::{Result, Error, KVReader, KVWriter, AppEnumeration, BatchExporter, GlobalSearcher, Orchestrator, CelerixStore, AppScope, VaultScope};
6use crate::engine::vault;
7use tokio::sync::Mutex;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10
11pub struct Client {
17 #[allow(dead_code)]
18 addr: String,
19 inner: Mutex<Option<ClientInner>>,
20}
21
22struct ClientInner {
23 reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
24 writer: tokio::net::tcp::OwnedWriteHalf,
25}
26
27impl Client {
28 pub async fn connect(addr: &str) -> Result<Self> {
30 let inner = Client::connect_inner(addr).await?;
31 Ok(Self {
32 addr: addr.to_string(),
33 inner: Mutex::new(Some(inner)),
34 })
35 }
36
37 async fn send_and_receive(&self, cmd: String) -> Result<String> {
38 let mut inner_guard = self.inner.lock().await;
39
40 for i in 0..3 {
42 if inner_guard.is_none() {
43 match Client::connect_inner(&self.addr).await {
44 Ok(inner) => *inner_guard = Some(inner),
45 Err(e) => {
46 if i == 2 { return Err(e); }
47 tokio::time::sleep(std::time::Duration::from_millis((i + 1) * 200)).await;
48 continue;
49 }
50 }
51 }
52
53 let inner = inner_guard.as_mut().unwrap();
54 if let Err(_) = inner.writer.write_all(format!("{}\n", cmd).as_bytes()).await {
55 *inner_guard = None;
56 continue;
57 }
58
59 let mut resp = String::new();
60 match inner.reader.read_line(&mut resp).await {
61 Ok(0) => {
62 *inner_guard = None;
63 continue;
64 }
65 Ok(_) => {
66 let resp = resp.trim();
67 if resp.starts_with("ERR") {
68 return Err(Error::Internal(resp[4..].to_string()));
69 }
70 return Ok(resp.to_string());
71 }
72 Err(_) => {
73 *inner_guard = None;
74 continue;
75 }
76 }
77 }
78
79 Err(Error::Internal("failed after 3 attempts".to_string()))
80 }
81
82 async fn connect_inner(addr: &str) -> Result<ClientInner> {
83 let stream = TcpStream::connect(addr).await?;
84 let (reader, writer) = stream.into_split();
85 Ok(ClientInner {
86 reader: BufReader::new(reader),
87 writer,
88 })
89 }
90
91 pub async fn get_generic<T: DeserializeOwned>(&self, persona_id: &str, app_id: &str, key: &str) -> Result<T> {
95 let val = self.get(persona_id, app_id, key).await?;
96 Ok(serde_json::from_value(val)?)
97 }
98
99 pub async fn set_generic<T: Serialize>(&self, persona_id: &str, app_id: &str, key: &str, value: T) -> Result<()> {
103 let val = serde_json::to_value(value)?;
104 self.set(persona_id, app_id, key, val).await
105 }
106}
107
108#[async_trait]
109impl KVReader for Client {
110 async fn get(&self, persona_id: &str, app_id: &str, key: &str) -> Result<serde_json::Value> {
111 let resp = self.send_and_receive(format!("GET {} {} {}", persona_id, app_id, key)).await?;
112 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
113 Ok(serde_json::from_str(json_data)?)
114 }
115}
116
117#[async_trait]
118impl KVWriter for Client {
119 async fn set(&self, persona_id: &str, app_id: &str, key: &str, value: serde_json::Value) -> Result<()> {
120 let val_str = serde_json::to_string(&value)?;
121 self.send_and_receive(format!("SET {} {} {} {}", persona_id, app_id, key, val_str)).await?;
122 Ok(())
123 }
124
125 async fn delete(&self, persona_id: &str, app_id: &str, key: &str) -> Result<()> {
126 self.send_and_receive(format!("DEL {} {} {}", persona_id, app_id, key)).await?;
127 Ok(())
128 }
129}
130
131#[async_trait]
132impl AppEnumeration for Client {
133 async fn get_personas(&self) -> Result<Vec<String>> {
134 let resp = self.send_and_receive("LIST_PERSONAS".to_string()).await?;
135 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
136 Ok(serde_json::from_str(json_data)?)
137 }
138
139 async fn get_apps(&self, persona_id: &str) -> Result<Vec<String>> {
140 let resp = self.send_and_receive(format!("LIST_APPS {}", persona_id)).await?;
141 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
142 Ok(serde_json::from_str(json_data)?)
143 }
144}
145
146#[async_trait]
147impl BatchExporter for Client {
148 async fn get_app_store(&self, persona_id: &str, app_id: &str) -> Result<HashMap<String, serde_json::Value>> {
149 let resp = self.send_and_receive(format!("DUMP {} {}", persona_id, app_id)).await?;
150 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
151 Ok(serde_json::from_str(json_data)?)
152 }
153
154 async fn dump_app(&self, app_id: &str) -> Result<HashMap<String, HashMap<String, serde_json::Value>>> {
155 let resp = self.send_and_receive(format!("DUMP_APP {}", app_id)).await?;
156 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
157 Ok(serde_json::from_str(json_data)?)
158 }
159}
160
161#[async_trait]
162impl GlobalSearcher for Client {
163 async fn get_global(&self, app_id: &str, key: &str) -> Result<(serde_json::Value, String)> {
164 let resp = self.send_and_receive(format!("GET_GLOBAL {} {}", app_id, key)).await?;
165 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
166 let out: serde_json::Value = serde_json::from_str(json_data)?;
167 let persona = out["persona"].as_str().ok_or_else(|| Error::Internal("Missing persona".to_string()))?.to_string();
168 let value = out["value"].clone();
169 Ok((value, persona))
170 }
171}
172
173#[async_trait]
174impl Orchestrator for Client {
175 async fn move_key(&self, src_persona: &str, dst_persona: &str, app_id: &str, key: &str) -> Result<()> {
176 self.send_and_receive(format!("MOVE {} {} {} {}", src_persona, dst_persona, app_id, key)).await?;
177 Ok(())
178 }
179}
180
181impl CelerixStore for Client {
182 fn app(&self, persona_id: &str, app_id: &str) -> Box<dyn AppScope + '_> {
183 Box::new(RemoteAppScope {
184 client: self,
185 persona_id: persona_id.to_string(),
186 app_id: app_id.to_string(),
187 })
188 }
189}
190
191pub struct RemoteAppScope<'a> {
192 client: &'a Client,
193 persona_id: String,
194 app_id: String,
195}
196
197#[async_trait]
198impl<'a> AppScope for RemoteAppScope<'a> {
199 async fn get(&self, key: &str) -> Result<serde_json::Value> {
200 self.client.get(&self.persona_id, &self.app_id, key).await
201 }
202
203 async fn set(&self, key: &str, value: serde_json::Value) -> Result<()> {
204 self.client.set(&self.persona_id, &self.app_id, key, value).await
205 }
206
207 async fn delete(&self, key: &str) -> Result<()> {
208 self.client.delete(&self.persona_id, &self.app_id, key).await
209 }
210
211 fn vault(&self, master_key: &[u8]) -> Box<dyn VaultScope + '_> {
212 Box::new(RemoteVaultScope {
213 app: self,
214 master_key: master_key.to_vec(),
215 })
216 }
217}
218
219pub struct RemoteVaultScope<'a> {
220 app: &'a RemoteAppScope<'a>,
221 master_key: Vec<u8>,
222}
223
224#[async_trait]
225impl<'a> VaultScope for RemoteVaultScope<'a> {
226 async fn get(&self, key: &str) -> Result<String> {
227 let val = self.app.get(key).await?;
228 let cipher_hex = val.as_str().ok_or_else(|| Error::Internal("Vault data is not a string".to_string()))?;
229 vault::decrypt(cipher_hex, &self.master_key)
230 }
231
232 async fn set(&self, key: &str, plaintext: &str) -> Result<()> {
233 let cipher_hex = vault::encrypt(plaintext, &self.master_key)?;
234 self.app.set(key, serde_json::Value::String(cipher_hex)).await
235 }
236}