celerix-store 0.1.2

A lightweight, low-latency KV data store with 1:1 parity with the orginal golang version, atomic persistence, and AES-256-GCM encryption.
Documentation
use std::collections::HashMap;
use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use crate::{Result, Error, KVReader, KVWriter, AppEnumeration, BatchExporter, GlobalSearcher, Orchestrator, CelerixStore, AppScope, VaultScope};
use crate::engine::vault;
use tokio::sync::Mutex;
use serde::de::DeserializeOwned;
use serde::Serialize;

/// A remote client for the Celerix Store.
/// 
/// `Client` implements the [`CelerixStore`] trait and communicates with a 
/// `celerix-stored` daemon over TCP. It features automatic reconnection and 
/// exponential backoff retries.
pub struct Client {
    #[allow(dead_code)]
    addr: String,
    inner: Mutex<Option<ClientInner>>,
}

struct ClientInner {
    reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
    writer: tokio::net::tcp::OwnedWriteHalf,
}

impl Client {
    /// Connects to a remote Celerix Store daemon at the specified address.
    pub async fn connect(addr: &str) -> Result<Self> {
        let inner = Client::connect_inner(addr).await?;
        Ok(Self {
            addr: addr.to_string(),
            inner: Mutex::new(Some(inner)),
        })
    }

    async fn send_and_receive(&self, cmd: String) -> Result<String> {
        let mut inner_guard = self.inner.lock().await;
        
        // Retry logic
        for i in 0..3 {
            if inner_guard.is_none() {
                match Client::connect_inner(&self.addr).await {
                    Ok(inner) => *inner_guard = Some(inner),
                    Err(e) => {
                        if i == 2 { return Err(e); }
                        tokio::time::sleep(std::time::Duration::from_millis((i + 1) * 200)).await;
                        continue;
                    }
                }
            }

            let inner = inner_guard.as_mut().unwrap();
            if let Err(_) = inner.writer.write_all(format!("{}\n", cmd).as_bytes()).await {
                 *inner_guard = None;
                 continue;
            }

            let mut resp = String::new();
            match inner.reader.read_line(&mut resp).await {
                Ok(0) => {
                    *inner_guard = None;
                    continue;
                }
                Ok(_) => {
                    let resp = resp.trim();
                    if resp.starts_with("ERR") {
                        return Err(Error::Internal(resp[4..].to_string()));
                    }
                    return Ok(resp.to_string());
                }
                Err(_) => {
                    *inner_guard = None;
                    continue;
                }
            }
        }
        
        Err(Error::Internal("failed after 3 attempts".to_string()))
    }

    async fn connect_inner(addr: &str) -> Result<ClientInner> {
        let stream = TcpStream::connect(addr).await?;
        let (reader, writer) = stream.into_split();
        Ok(ClientInner {
            reader: BufReader::new(reader),
            writer,
        })
    }

    /// Retrieves a type-safe value using generics.
    /// 
    /// Automatically handles JSON deserialization into the target type.
    pub async fn get_generic<T: DeserializeOwned>(&self, persona_id: &str, app_id: &str, key: &str) -> Result<T> {
        let val = self.get(persona_id, app_id, key).await?;
        Ok(serde_json::from_value(val)?)
    }

    /// Stores a type-safe value using generics.
    /// 
    /// Automatically handles JSON serialization of the value.
    pub async fn set_generic<T: Serialize>(&self, persona_id: &str, app_id: &str, key: &str, value: T) -> Result<()> {
        let val = serde_json::to_value(value)?;
        self.set(persona_id, app_id, key, val).await
    }
}

#[async_trait]
impl KVReader for Client {
    async fn get(&self, persona_id: &str, app_id: &str, key: &str) -> Result<serde_json::Value> {
        let resp = self.send_and_receive(format!("GET {} {} {}", persona_id, app_id, key)).await?;
        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
        Ok(serde_json::from_str(json_data)?)
    }
}

#[async_trait]
impl KVWriter for Client {
    async fn set(&self, persona_id: &str, app_id: &str, key: &str, value: serde_json::Value) -> Result<()> {
        let val_str = serde_json::to_string(&value)?;
        self.send_and_receive(format!("SET {} {} {} {}", persona_id, app_id, key, val_str)).await?;
        Ok(())
    }

    async fn delete(&self, persona_id: &str, app_id: &str, key: &str) -> Result<()> {
        self.send_and_receive(format!("DEL {} {} {}", persona_id, app_id, key)).await?;
        Ok(())
    }
}

#[async_trait]
impl AppEnumeration for Client {
    async fn get_personas(&self) -> Result<Vec<String>> {
        let resp = self.send_and_receive("LIST_PERSONAS".to_string()).await?;
        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
        Ok(serde_json::from_str(json_data)?)
    }

    async fn get_apps(&self, persona_id: &str) -> Result<Vec<String>> {
        let resp = self.send_and_receive(format!("LIST_APPS {}", persona_id)).await?;
        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
        Ok(serde_json::from_str(json_data)?)
    }
}

#[async_trait]
impl BatchExporter for Client {
    async fn get_app_store(&self, persona_id: &str, app_id: &str) -> Result<HashMap<String, serde_json::Value>> {
        let resp = self.send_and_receive(format!("DUMP {} {}", persona_id, app_id)).await?;
        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
        Ok(serde_json::from_str(json_data)?)
    }

    async fn dump_app(&self, app_id: &str) -> Result<HashMap<String, HashMap<String, serde_json::Value>>> {
        let resp = self.send_and_receive(format!("DUMP_APP {}", app_id)).await?;
        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
        Ok(serde_json::from_str(json_data)?)
    }
}

#[async_trait]
impl GlobalSearcher for Client {
    async fn get_global(&self, app_id: &str, key: &str) -> Result<(serde_json::Value, String)> {
        let resp = self.send_and_receive(format!("GET_GLOBAL {} {}", app_id, key)).await?;
        let json_data = resp.strip_prefix("OK ").ok_or_else(|| Error::Internal("Invalid response".to_string()))?;
        let out: serde_json::Value = serde_json::from_str(json_data)?;
        let persona = out["persona"].as_str().ok_or_else(|| Error::Internal("Missing persona".to_string()))?.to_string();
        let value = out["value"].clone();
        Ok((value, persona))
    }
}

#[async_trait]
impl Orchestrator for Client {
    async fn move_key(&self, src_persona: &str, dst_persona: &str, app_id: &str, key: &str) -> Result<()> {
        self.send_and_receive(format!("MOVE {} {} {} {}", src_persona, dst_persona, app_id, key)).await?;
        Ok(())
    }
}

impl CelerixStore for Client {
    fn app(&self, persona_id: &str, app_id: &str) -> Box<dyn AppScope + '_> {
        Box::new(RemoteAppScope {
            client: self,
            persona_id: persona_id.to_string(),
            app_id: app_id.to_string(),
        })
    }
}

pub struct RemoteAppScope<'a> {
    client: &'a Client,
    persona_id: String,
    app_id: String,
}

#[async_trait]
impl<'a> AppScope for RemoteAppScope<'a> {
    async fn get(&self, key: &str) -> Result<serde_json::Value> {
        self.client.get(&self.persona_id, &self.app_id, key).await
    }

    async fn set(&self, key: &str, value: serde_json::Value) -> Result<()> {
        self.client.set(&self.persona_id, &self.app_id, key, value).await
    }

    async fn delete(&self, key: &str) -> Result<()> {
        self.client.delete(&self.persona_id, &self.app_id, key).await
    }

    fn vault(&self, master_key: &[u8]) -> Box<dyn VaultScope + '_> {
        Box::new(RemoteVaultScope {
            app: self,
            master_key: master_key.to_vec(),
        })
    }
}

pub struct RemoteVaultScope<'a> {
    app: &'a RemoteAppScope<'a>,
    master_key: Vec<u8>,
}

#[async_trait]
impl<'a> VaultScope for RemoteVaultScope<'a> {
    async fn get(&self, key: &str) -> Result<String> {
        let val = self.app.get(key).await?;
        let cipher_hex = val.as_str().ok_or_else(|| Error::Internal("Vault data is not a string".to_string()))?;
        vault::decrypt(cipher_hex, &self.master_key)
    }

    async fn set(&self, key: &str, plaintext: &str) -> Result<()> {
        let cipher_hex = vault::encrypt(plaintext, &self.master_key)?;
        self.app.set(key, serde_json::Value::String(cipher_hex)).await
    }
}