Skip to main content

synap_sdk/
transactions.rs

1//! Redis-compatible transaction support (MULTI/EXEC/WATCH/DISCARD)
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use serde::{Deserialize, Serialize};
6use serde_json::{Value, json};
7
8/// Options for transaction commands
9#[derive(Debug, Clone, Default)]
10pub struct TransactionOptions {
11    pub client_id: Option<String>,
12}
13
14impl TransactionOptions {
15    fn into_payload(self) -> Value {
16        match self.client_id {
17            Some(client_id) => json!({"client_id": client_id}),
18            None => json!({}),
19        }
20    }
21}
22
23/// Standard response for MULTI/DISCARD/WATCH/UNWATCH
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct TransactionResponse {
26    pub success: bool,
27    #[serde(default)]
28    pub message: Option<String>,
29}
30
31/// Result returned by EXEC
32#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(untagged)]
34pub enum TransactionExecResult {
35    Success {
36        results: Vec<Value>,
37    },
38    Aborted {
39        aborted: bool,
40        #[serde(default)]
41        message: Option<String>,
42    },
43}
44
45/// Helper for sending raw commands within a transaction
46pub struct TransactionCommandClient {
47    client: SynapClient,
48    client_id: String,
49}
50
51impl TransactionCommandClient {
52    /// Send a raw command ensuring the `client_id` is attached
53    pub async fn send_command(&self, command: &str, mut payload: Value) -> Result<Value> {
54        if let Value::Object(ref mut map) = payload {
55            map.insert(
56                "client_id".to_string(),
57                Value::String(self.client_id.clone()),
58            );
59        }
60
61        self.client.send_command(command, payload).await
62    }
63
64    /// Access underlying client id
65    pub fn client_id(&self) -> &str {
66        &self.client_id
67    }
68}
69
70/// Transaction manager exposing MULTI/EXEC/WATCH/DISCARD
71#[derive(Clone)]
72pub struct TransactionManager {
73    client: SynapClient,
74}
75
76impl TransactionManager {
77    pub(crate) fn new(client: SynapClient) -> Self {
78        Self { client }
79    }
80
81    /// Start a transaction (MULTI)
82    pub async fn multi(&self, options: TransactionOptions) -> Result<TransactionResponse> {
83        let response = self
84            .client
85            .send_command("transaction.multi", options.clone().into_payload())
86            .await?;
87        Self::parse_response(response)
88    }
89
90    /// Discard an active transaction (DISCARD)
91    pub async fn discard(&self, options: TransactionOptions) -> Result<TransactionResponse> {
92        let response = self
93            .client
94            .send_command("transaction.discard", options.clone().into_payload())
95            .await?;
96        Self::parse_response(response)
97    }
98
99    /// Watch keys for optimistic locking (WATCH)
100    pub async fn watch(
101        &self,
102        keys: &[impl AsRef<str>],
103        options: TransactionOptions,
104    ) -> Result<TransactionResponse> {
105        let mut payload = options.into_payload();
106        if let Value::Object(ref mut map) = payload {
107            map.insert(
108                "keys".into(),
109                Value::Array(
110                    keys.iter()
111                        .map(|k| Value::String(k.as_ref().to_string()))
112                        .collect(),
113                ),
114            );
115        }
116
117        let response = self
118            .client
119            .send_command("transaction.watch", payload)
120            .await?;
121        Self::parse_response(response)
122    }
123
124    /// Remove all watched keys (UNWATCH)
125    pub async fn unwatch(&self, options: TransactionOptions) -> Result<TransactionResponse> {
126        let response = self
127            .client
128            .send_command("transaction.unwatch", options.clone().into_payload())
129            .await?;
130        Self::parse_response(response)
131    }
132
133    /// Execute queued commands (EXEC)
134    pub async fn exec(&self, options: TransactionOptions) -> Result<TransactionExecResult> {
135        let response = self
136            .client
137            .send_command("transaction.exec", options.clone().into_payload())
138            .await?;
139
140        if response["results"].is_array() {
141            let result = serde_json::from_value::<Vec<Value>>(response["results"].clone())?;
142            return Ok(TransactionExecResult::Success { results: result });
143        }
144
145        let aborted = response["aborted"].as_bool().unwrap_or(true);
146        let message = response["message"].as_str().map(|s| s.to_string());
147        Ok(TransactionExecResult::Aborted { aborted, message })
148    }
149
150    /// Create a helper client that automatically injects `client_id` for raw commands
151    pub fn command_client(&self, client_id: impl Into<String>) -> TransactionCommandClient {
152        TransactionCommandClient {
153            client: self.client.clone(),
154            client_id: client_id.into(),
155        }
156    }
157
158    fn parse_response(response: Value) -> Result<TransactionResponse> {
159        let success = response["success"].as_bool().unwrap_or(true);
160        let message = response["message"].as_str().map(|s| s.to_string());
161        Ok(TransactionResponse { success, message })
162    }
163}