1use std::collections::HashMap;
2use async_trait::async_trait;
3use tokio::net::TcpStream;
4use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
5use crate::{Result, Error, KVReader, KVWriter, AppEnumeration, BatchExporter, GlobalSearcher, Orchestrator, CelerixStore, AppScope, VaultScope};
6use crate::engine::vault;
7use tokio::sync::Mutex;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10
11pub struct Client {
12 #[allow(dead_code)]
13 addr: String,
14 inner: Mutex<Option<ClientInner>>,
15}
16
17struct ClientInner {
18 reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
19 writer: tokio::net::tcp::OwnedWriteHalf,
20}
21
22impl Client {
23 pub async fn connect(addr: &str) -> Result<Self> {
24 let inner = Client::connect_inner(addr).await?;
25 Ok(Self {
26 addr: addr.to_string(),
27 inner: Mutex::new(Some(inner)),
28 })
29 }
30
31 async fn send_and_receive(&self, cmd: String) -> Result<String> {
32 let mut inner_guard = self.inner.lock().await;
33
34 for i in 0..3 {
36 if inner_guard.is_none() {
37 match Client::connect_inner(&self.addr).await {
38 Ok(inner) => *inner_guard = Some(inner),
39 Err(e) => {
40 if i == 2 { return Err(e); }
41 tokio::time::sleep(std::time::Duration::from_millis((i + 1) * 200)).await;
42 continue;
43 }
44 }
45 }
46
47 let inner = inner_guard.as_mut().unwrap();
48 if let Err(_) = inner.writer.write_all(format!("{}\n", cmd).as_bytes()).await {
49 *inner_guard = None;
50 continue;
51 }
52
53 let mut resp = String::new();
54 match inner.reader.read_line(&mut resp).await {
55 Ok(0) => {
56 *inner_guard = None;
57 continue;
58 }
59 Ok(_) => {
60 let resp = resp.trim();
61 if resp.starts_with("ERR") {
62 return Err(Error::Internal(resp[4..].to_string()));
63 }
64 return Ok(resp.to_string());
65 }
66 Err(_) => {
67 *inner_guard = None;
68 continue;
69 }
70 }
71 }
72
73 Err(Error::Internal("failed after 3 attempts".to_string()))
74 }
75
76 async fn connect_inner(addr: &str) -> Result<ClientInner> {
77 let stream = TcpStream::connect(addr).await?;
78 let (reader, writer) = stream.into_split();
79 Ok(ClientInner {
80 reader: BufReader::new(reader),
81 writer,
82 })
83 }
84
85 pub async fn get_generic<T: DeserializeOwned>(&self, persona_id: &str, app_id: &str, key: &str) -> Result<T> {
86 let val = self.get(persona_id, app_id, key).await?;
87 Ok(serde_json::from_value(val)?)
88 }
89
90 pub async fn set_generic<T: Serialize>(&self, persona_id: &str, app_id: &str, key: &str, value: T) -> Result<()> {
91 let val = serde_json::to_value(value)?;
92 self.set(persona_id, app_id, key, val).await
93 }
94}
95
96#[async_trait]
97impl KVReader for Client {
98 async fn get(&self, persona_id: &str, app_id: &str, key: &str) -> Result<serde_json::Value> {
99 let resp = self.send_and_receive(format!("GET {} {} {}", persona_id, app_id, key)).await?;
100 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
101 Ok(serde_json::from_str(json_data)?)
102 }
103}
104
105#[async_trait]
106impl KVWriter for Client {
107 async fn set(&self, persona_id: &str, app_id: &str, key: &str, value: serde_json::Value) -> Result<()> {
108 let val_str = serde_json::to_string(&value)?;
109 self.send_and_receive(format!("SET {} {} {} {}", persona_id, app_id, key, val_str)).await?;
110 Ok(())
111 }
112
113 async fn delete(&self, persona_id: &str, app_id: &str, key: &str) -> Result<()> {
114 self.send_and_receive(format!("DEL {} {} {}", persona_id, app_id, key)).await?;
115 Ok(())
116 }
117}
118
119#[async_trait]
120impl AppEnumeration for Client {
121 async fn get_personas(&self) -> Result<Vec<String>> {
122 let resp = self.send_and_receive("LIST_PERSONAS".to_string()).await?;
123 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
124 Ok(serde_json::from_str(json_data)?)
125 }
126
127 async fn get_apps(&self, persona_id: &str) -> Result<Vec<String>> {
128 let resp = self.send_and_receive(format!("LIST_APPS {}", persona_id)).await?;
129 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
130 Ok(serde_json::from_str(json_data)?)
131 }
132}
133
134#[async_trait]
135impl BatchExporter for Client {
136 async fn get_app_store(&self, persona_id: &str, app_id: &str) -> Result<HashMap<String, serde_json::Value>> {
137 let resp = self.send_and_receive(format!("DUMP {} {}", persona_id, app_id)).await?;
138 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
139 Ok(serde_json::from_str(json_data)?)
140 }
141
142 async fn dump_app(&self, app_id: &str) -> Result<HashMap<String, HashMap<String, serde_json::Value>>> {
143 let resp = self.send_and_receive(format!("DUMP_APP {}", app_id)).await?;
144 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
145 Ok(serde_json::from_str(json_data)?)
146 }
147}
148
149#[async_trait]
150impl GlobalSearcher for Client {
151 async fn get_global(&self, app_id: &str, key: &str) -> Result<(serde_json::Value, String)> {
152 let resp = self.send_and_receive(format!("GET_GLOBAL {} {}", app_id, key)).await?;
153 let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
154 let out: serde_json::Value = serde_json::from_str(json_data)?;
155 let persona = out["persona"].as_str().ok_or_else(|| Error::Internal("Missing persona".to_string()))?.to_string();
156 let value = out["value"].clone();
157 Ok((value, persona))
158 }
159}
160
161#[async_trait]
162impl Orchestrator for Client {
163 async fn move_key(&self, src_persona: &str, dst_persona: &str, app_id: &str, key: &str) -> Result<()> {
164 self.send_and_receive(format!("MOVE {} {} {} {}", src_persona, dst_persona, app_id, key)).await?;
165 Ok(())
166 }
167}
168
169impl CelerixStore for Client {
170 fn app(&self, persona_id: &str, app_id: &str) -> Box<dyn AppScope + '_> {
171 Box::new(RemoteAppScope {
172 client: self,
173 persona_id: persona_id.to_string(),
174 app_id: app_id.to_string(),
175 })
176 }
177}
178
179pub struct RemoteAppScope<'a> {
180 client: &'a Client,
181 persona_id: String,
182 app_id: String,
183}
184
185#[async_trait]
186impl<'a> AppScope for RemoteAppScope<'a> {
187 async fn get(&self, key: &str) -> Result<serde_json::Value> {
188 self.client.get(&self.persona_id, &self.app_id, key).await
189 }
190
191 async fn set(&self, key: &str, value: serde_json::Value) -> Result<()> {
192 self.client.set(&self.persona_id, &self.app_id, key, value).await
193 }
194
195 async fn delete(&self, key: &str) -> Result<()> {
196 self.client.delete(&self.persona_id, &self.app_id, key).await
197 }
198
199 fn vault(&self, master_key: &[u8]) -> Box<dyn VaultScope + '_> {
200 Box::new(RemoteVaultScope {
201 app: self,
202 master_key: master_key.to_vec(),
203 })
204 }
205}
206
207pub struct RemoteVaultScope<'a> {
208 app: &'a RemoteAppScope<'a>,
209 master_key: Vec<u8>,
210}
211
212#[async_trait]
213impl<'a> VaultScope for RemoteVaultScope<'a> {
214 async fn get(&self, key: &str) -> Result<String> {
215 let val = self.app.get(key).await?;
216 let cipher_hex = val.as_str().ok_or_else(|| Error::Internal("Vault data is not a string".to_string()))?;
217 vault::decrypt(cipher_hex, &self.master_key)
218 }
219
220 async fn set(&self, key: &str, plaintext: &str) -> Result<()> {
221 let cipher_hex = vault::encrypt(plaintext, &self.master_key)?;
222 self.app.set(key, serde_json::Value::String(cipher_hex)).await
223 }
224}