Skip to main content

synap_sdk/
client.rs

1//! Synap client implementation
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use reqwest::Client;
7use serde_json::Value;
8use url::Url;
9
10use crate::error::{Result, SynapError};
11use crate::transport::{
12    Resp3Transport, SynapRpcTransport, TransportMode, map_command, map_response,
13};
14use crate::{
15    BitmapManager, GeospatialManager, HashManager, HyperLogLogManager, KVStore, ListManager,
16    PubSubManager, QueueManager, ScriptManager, SetManager, SortedSetManager, StreamManager,
17    TransactionManager,
18};
19
20// ── SynapConfig ───────────────────────────────────────────────────────────────
21
22/// Synap client configuration.
23#[derive(Debug, Clone)]
24pub struct SynapConfig {
25    /// Base URL of the Synap HTTP server (used for HTTP transport and fallback).
26    pub base_url: String,
27    /// Host for the SynapRPC TCP listener (default: `127.0.0.1`).
28    pub rpc_host: String,
29    /// Port for the SynapRPC TCP listener (default: `15501`).
30    pub rpc_port: u16,
31    /// Host for the RESP3 TCP listener (default: `127.0.0.1`).
32    pub resp3_host: String,
33    /// Port for the RESP3 TCP listener (default: `6379`).
34    pub resp3_port: u16,
35    /// Which binary protocol to use as primary transport (default: `SynapRpc`).
36    pub transport: TransportMode,
37    /// Request / connection timeout.
38    pub timeout: Duration,
39    /// Maximum retry attempts for HTTP requests.
40    pub max_retries: u32,
41    /// Optional API key token (Bearer token for HTTP).
42    pub auth_token: Option<String>,
43    /// Optional username for HTTP Basic Auth.
44    pub username: Option<String>,
45    /// Optional password for HTTP Basic Auth.
46    pub password: Option<String>,
47}
48
49impl SynapConfig {
50    /// Create a configuration, inferring the transport from the URL scheme.
51    ///
52    /// | Scheme | Transport | Default port |
53    /// |--------|-----------|--------------|
54    /// | `http://` or `https://` | HTTP REST | — (as given) |
55    /// | `synap://` | SynapRPC (MessagePack/TCP) | 15501 |
56    /// | `resp3://` | RESP3 (Redis wire/TCP) | 6379 |
57    ///
58    /// For `synap://` and `resp3://` URLs the host and port are parsed from
59    /// the URL.  The HTTP base URL is set to `http://<host>:15500` as a
60    /// convenience for any HTTP-only admin calls.
61    ///
62    /// # Examples
63    /// ```
64    /// use synap_sdk::SynapConfig;
65    ///
66    /// let c = SynapConfig::new("synap://localhost:15501");
67    /// let c = SynapConfig::new("resp3://localhost:6379");
68    /// let c = SynapConfig::new("http://localhost:15500");
69    /// ```
70    pub fn new(url: impl Into<String>) -> Self {
71        let raw = url.into();
72
73        // Parse scheme to infer transport without pulling in the full `url` crate
74        // at this point (we just need scheme + host + port).
75        if let Some(rest) = raw.strip_prefix("synap://") {
76            let (host, port) = Self::parse_host_port(rest, 15501);
77            return Self {
78                base_url: format!("http://{}:15500", host),
79                rpc_host: host,
80                rpc_port: port,
81                resp3_host: "127.0.0.1".into(),
82                resp3_port: 6379,
83                transport: TransportMode::SynapRpc,
84                timeout: Duration::from_secs(30),
85                max_retries: 3,
86                auth_token: None,
87                username: None,
88                password: None,
89            };
90        }
91
92        if let Some(rest) = raw.strip_prefix("resp3://") {
93            let (host, port) = Self::parse_host_port(rest, 6379);
94            return Self {
95                base_url: format!("http://{}:15500", host),
96                rpc_host: "127.0.0.1".into(),
97                rpc_port: 15501,
98                resp3_host: host,
99                resp3_port: port,
100                transport: TransportMode::Resp3,
101                timeout: Duration::from_secs(30),
102                max_retries: 3,
103                auth_token: None,
104                username: None,
105                password: None,
106            };
107        }
108
109        // http:// or https:// → HTTP transport
110        Self {
111            base_url: raw,
112            rpc_host: "127.0.0.1".into(),
113            rpc_port: 15501,
114            resp3_host: "127.0.0.1".into(),
115            resp3_port: 6379,
116            transport: TransportMode::Http,
117            timeout: Duration::from_secs(30),
118            max_retries: 3,
119            auth_token: None,
120            username: None,
121            password: None,
122        }
123    }
124
125    /// Parse `"host:port"` from a URL authority string, falling back to
126    /// `default_port` when no port is present.
127    fn parse_host_port(authority: &str, default_port: u16) -> (String, u16) {
128        // Strip any trailing path component.
129        let authority = authority.split('/').next().unwrap_or(authority);
130        if let Some(colon) = authority.rfind(':') {
131            let host = authority[..colon].to_string();
132            let port = authority[colon + 1..]
133                .parse::<u16>()
134                .unwrap_or(default_port);
135            (host, port)
136        } else {
137            (authority.to_string(), default_port)
138        }
139    }
140
141    /// Use the HTTP REST transport only (original SDK behaviour).
142    ///
143    /// # Deprecated
144    /// Prefer passing an `http://` URL to [`SynapConfig::new`].
145    #[deprecated(
146        since = "0.11.0",
147        note = "Pass an `http://` URL to SynapConfig::new instead"
148    )]
149    pub fn with_http_transport(mut self) -> Self {
150        self.transport = TransportMode::Http;
151        self
152    }
153
154    /// Use the SynapRPC binary transport (MessagePack over TCP). This is the
155    /// default and has the lowest latency of the three options.
156    ///
157    /// # Deprecated
158    /// Prefer passing a `synap://` URL to [`SynapConfig::new`].
159    #[deprecated(
160        since = "0.11.0",
161        note = "Pass a `synap://` URL to SynapConfig::new instead"
162    )]
163    pub fn with_synap_rpc_transport(mut self) -> Self {
164        self.transport = TransportMode::SynapRpc;
165        self
166    }
167
168    /// Use the RESP3 text transport (Redis-compatible wire protocol over TCP).
169    ///
170    /// # Deprecated
171    /// Prefer passing a `resp3://` URL to [`SynapConfig::new`].
172    #[deprecated(
173        since = "0.11.0",
174        note = "Pass a `resp3://` URL to SynapConfig::new instead"
175    )]
176    pub fn with_resp3_transport(mut self) -> Self {
177        self.transport = TransportMode::Resp3;
178        self
179    }
180
181    /// Override the SynapRPC listener address (host + port).
182    ///
183    /// # Deprecated
184    /// Encode the host and port in the `synap://host:port` URL instead.
185    #[deprecated(
186        since = "0.11.0",
187        note = "Encode host and port in the `synap://` URL passed to SynapConfig::new"
188    )]
189    pub fn with_rpc_addr(mut self, host: impl Into<String>, port: u16) -> Self {
190        self.rpc_host = host.into();
191        self.rpc_port = port;
192        self
193    }
194
195    /// Override the RESP3 listener address (host + port).
196    ///
197    /// # Deprecated
198    /// Encode the host and port in the `resp3://host:port` URL instead.
199    #[deprecated(
200        since = "0.11.0",
201        note = "Encode host and port in the `resp3://` URL passed to SynapConfig::new"
202    )]
203    pub fn with_resp3_addr(mut self, host: impl Into<String>, port: u16) -> Self {
204        self.resp3_host = host.into();
205        self.resp3_port = port;
206        self
207    }
208
209    /// Set the timeout for connections and requests.
210    pub fn with_timeout(mut self, timeout: Duration) -> Self {
211        self.timeout = timeout;
212        self
213    }
214
215    /// Set the authentication token (API key / Bearer token for HTTP).
216    pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
217        self.auth_token = Some(token.into());
218        self.username = None;
219        self.password = None;
220        self
221    }
222
223    /// Set HTTP Basic Auth credentials.
224    pub fn with_basic_auth(
225        mut self,
226        username: impl Into<String>,
227        password: impl Into<String>,
228    ) -> Self {
229        self.username = Some(username.into());
230        self.password = Some(password.into());
231        self.auth_token = None;
232        self
233    }
234
235    /// Set the maximum HTTP retry attempts.
236    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
237        self.max_retries = max_retries;
238        self
239    }
240}
241
242// ── Internal transport enum ───────────────────────────────────────────────────
243
244enum Transport {
245    Http,
246    SynapRpc(Arc<SynapRpcTransport>),
247    Resp3(Arc<Resp3Transport>),
248}
249
250// ── SynapClient ───────────────────────────────────────────────────────────────
251
252/// Main Synap client.
253///
254/// Internally uses one of three transports — SynapRPC (default), RESP3, or
255/// HTTP — selected via [`SynapConfig::transport`].  Commands that have no
256/// native-protocol mapping automatically fall back to HTTP regardless of the
257/// chosen transport.
258#[derive(Clone)]
259pub struct SynapClient {
260    #[allow(dead_code)]
261    config: Arc<SynapConfig>,
262    http_client: Client,
263    base_url: Url,
264    transport: Arc<Transport>,
265}
266
267impl SynapClient {
268    /// Create a new Synap client using the provided configuration.
269    pub fn new(config: SynapConfig) -> Result<Self> {
270        let base_url = Url::parse(&config.base_url)?;
271
272        // Build reqwest HTTP client (needed for fallback and Http transport).
273        let mut builder = Client::builder().timeout(config.timeout);
274
275        if let Some(ref token) = config.auth_token {
276            let mut headers = reqwest::header::HeaderMap::new();
277            headers.insert(
278                reqwest::header::AUTHORIZATION,
279                format!("Bearer {}", token).parse().unwrap(),
280            );
281            builder = builder.default_headers(headers);
282        } else if let (Some(username), Some(password)) = (&config.username, &config.password) {
283            use base64::Engine;
284            let encoded = base64::engine::general_purpose::STANDARD
285                .encode(format!("{}:{}", username, password));
286            let mut headers = reqwest::header::HeaderMap::new();
287            headers.insert(
288                reqwest::header::AUTHORIZATION,
289                format!("Basic {}", encoded).parse().unwrap(),
290            );
291            builder = builder.default_headers(headers);
292        }
293
294        let http_client = builder.build()?;
295
296        let transport = match config.transport {
297            TransportMode::Http => Arc::new(Transport::Http),
298            TransportMode::SynapRpc => Arc::new(Transport::SynapRpc(Arc::new(
299                SynapRpcTransport::new(&config.rpc_host, config.rpc_port, config.timeout),
300            ))),
301            TransportMode::Resp3 => Arc::new(Transport::Resp3(Arc::new(Resp3Transport::new(
302                &config.resp3_host,
303                config.resp3_port,
304                config.timeout,
305            )))),
306        };
307
308        Ok(Self {
309            config: Arc::new(config),
310            http_client,
311            base_url,
312            transport,
313        })
314    }
315
316    // ── Manager accessors ─────────────────────────────────────────────────────
317
318    /// Get the Key-Value store interface.
319    pub fn kv(&self) -> KVStore {
320        KVStore::new(self.clone())
321    }
322
323    /// Get the Hash manager interface.
324    pub fn hash(&self) -> HashManager {
325        HashManager::new(self.clone())
326    }
327
328    /// Get the List manager interface.
329    pub fn list(&self) -> ListManager {
330        ListManager::new(self.clone())
331    }
332
333    /// Get the Set manager interface.
334    pub fn set(&self) -> SetManager {
335        SetManager::new(self.clone())
336    }
337
338    /// Get the Sorted Set manager interface.
339    pub fn sorted_set(&self) -> SortedSetManager {
340        SortedSetManager::new(self.clone())
341    }
342
343    /// Get the Queue manager interface.
344    pub fn queue(&self) -> QueueManager {
345        QueueManager::new(self.clone())
346    }
347
348    /// Get the Stream manager interface.
349    pub fn stream(&self) -> StreamManager {
350        StreamManager::new(self.clone())
351    }
352
353    /// Get the Pub/Sub manager interface.
354    pub fn pubsub(&self) -> PubSubManager {
355        PubSubManager::new(self.clone())
356    }
357
358    /// Get the Transaction manager interface.
359    pub fn transaction(&self) -> TransactionManager {
360        TransactionManager::new(self.clone())
361    }
362
363    /// Get the Scripting manager interface.
364    pub fn script(&self) -> ScriptManager {
365        ScriptManager::new(self.clone())
366    }
367
368    /// Get the HyperLogLog manager interface.
369    pub fn hyperloglog(&self) -> HyperLogLogManager {
370        HyperLogLogManager::new(self.clone())
371    }
372
373    /// Get the Bitmap manager interface.
374    pub fn bitmap(&self) -> BitmapManager {
375        BitmapManager::new(self.clone())
376    }
377
378    /// Get the Geospatial manager interface.
379    pub fn geospatial(&self) -> GeospatialManager {
380        GeospatialManager::new(self.clone())
381    }
382
383    // ── Command dispatch ──────────────────────────────────────────────────────
384
385    /// Dispatch a command to the active transport.
386    ///
387    /// For SynapRPC and RESP3 transports every command must have a native
388    /// mapping in the transport mapper.  Unmapped commands return
389    /// [`SynapError::UnsupportedCommand`] — there is no silent HTTP fallback.
390    /// Use an `http://` URL if you need HTTP REST for a command that is not
391    /// yet in the mapper.
392    pub async fn send_command(&self, command: &str, payload: Value) -> Result<Value> {
393        match self.transport.as_ref() {
394            Transport::Http => self.send_http(command, payload).await,
395
396            Transport::SynapRpc(rpc) => match map_command(command, &payload) {
397                Some((raw_cmd, args)) => {
398                    let wire = rpc.execute(raw_cmd, args).await?;
399                    Ok(map_response(command, wire))
400                }
401                None => Err(SynapError::UnsupportedCommand {
402                    command: command.to_owned(),
403                    transport: "SynapRpc".to_owned(),
404                }),
405            },
406
407            Transport::Resp3(resp3) => match map_command(command, &payload) {
408                Some((raw_cmd, args)) => {
409                    let wire = resp3.execute(raw_cmd, args).await?;
410                    Ok(map_response(command, wire))
411                }
412                None => Err(SynapError::UnsupportedCommand {
413                    command: command.to_owned(),
414                    transport: "Resp3".to_owned(),
415                }),
416            },
417        }
418    }
419
420    /// Send a command via HTTP REST (original `api/v1/command` endpoint).
421    async fn send_http(&self, command: &str, payload: Value) -> Result<Value> {
422        let request_id = uuid::Uuid::new_v4().to_string();
423
424        let body = serde_json::json!({
425            "command": command,
426            "request_id": request_id,
427            "payload": payload,
428        });
429
430        let url = self
431            .base_url
432            .join("api/v1/command")
433            .map_err(SynapError::InvalidUrl)?;
434
435        let response = self.http_client.post(url).json(&body).send().await?;
436
437        if !response.status().is_success() {
438            let error_text = response.text().await.unwrap_or_default();
439            return Err(SynapError::ServerError(error_text));
440        }
441
442        let result: Value = response.json().await?;
443
444        if !result["success"].as_bool().unwrap_or(false) {
445            let error_msg = result["error"]
446                .as_str()
447                .unwrap_or("Unknown error")
448                .to_string();
449            return Err(SynapError::ServerError(error_msg));
450        }
451
452        Ok(result["payload"].clone())
453    }
454
455    // ── Accessors ─────────────────────────────────────────────────────────────
456
457    /// Get the configured base URL.
458    #[allow(dead_code)]
459    pub fn base_url(&self) -> &Url {
460        &self.base_url
461    }
462
463    /// Get the underlying reqwest HTTP client.
464    #[allow(dead_code)]
465    pub(crate) fn http_client(&self) -> &Client {
466        &self.http_client
467    }
468
469    /// Return a reference to the `SynapRpcTransport` when the active transport
470    /// is `SynapRpc`, or `None` for HTTP / RESP3.
471    pub(crate) fn synap_rpc_transport(&self) -> Option<Arc<SynapRpcTransport>> {
472        match self.transport.as_ref() {
473            Transport::SynapRpc(rpc) => Some(Arc::clone(rpc)),
474            _ => None,
475        }
476    }
477}
478
479// ── Tests ─────────────────────────────────────────────────────────────────────
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484
485    fn http_config() -> SynapConfig {
486        SynapConfig::new("http://localhost:15500")
487    }
488
489    #[test]
490    fn test_config_creation_http_url_infers_http_transport() {
491        let config = SynapConfig::new("http://localhost:15500");
492        assert_eq!(config.base_url, "http://localhost:15500");
493        assert_eq!(config.timeout, Duration::from_secs(30));
494        assert_eq!(config.max_retries, 3);
495        assert!(config.auth_token.is_none());
496        // http:// URLs → Http transport (not SynapRpc)
497        assert!(matches!(config.transport, TransportMode::Http));
498    }
499
500    #[test]
501    fn test_config_synap_url_infers_rpc_transport() {
502        let c = SynapConfig::new("synap://localhost:15501");
503        assert!(matches!(c.transport, TransportMode::SynapRpc));
504        assert_eq!(c.rpc_host, "localhost");
505        assert_eq!(c.rpc_port, 15501);
506    }
507
508    #[test]
509    fn test_config_resp3_url_infers_resp3_transport() {
510        let c = SynapConfig::new("resp3://localhost:6379");
511        assert!(matches!(c.transport, TransportMode::Resp3));
512        assert_eq!(c.resp3_host, "localhost");
513        assert_eq!(c.resp3_port, 6379);
514    }
515
516    #[test]
517    fn test_config_synap_url_default_port() {
518        let c = SynapConfig::new("synap://myhost");
519        assert!(matches!(c.transport, TransportMode::SynapRpc));
520        assert_eq!(c.rpc_host, "myhost");
521        assert_eq!(c.rpc_port, 15501); // default
522    }
523
524    #[test]
525    fn test_config_resp3_url_default_port() {
526        let c = SynapConfig::new("resp3://myhost");
527        assert!(matches!(c.transport, TransportMode::Resp3));
528        assert_eq!(c.resp3_host, "myhost");
529        assert_eq!(c.resp3_port, 6379); // default
530    }
531
532    #[test]
533    #[allow(deprecated)]
534    fn test_config_deprecated_transport_selection() {
535        let c = SynapConfig::new("http://localhost:15500").with_http_transport();
536        assert!(matches!(c.transport, TransportMode::Http));
537
538        let c = SynapConfig::new("http://localhost:15500").with_resp3_transport();
539        assert!(matches!(c.transport, TransportMode::Resp3));
540
541        let c = SynapConfig::new("http://localhost:15500").with_synap_rpc_transport();
542        assert!(matches!(c.transport, TransportMode::SynapRpc));
543    }
544
545    #[test]
546    #[allow(deprecated)]
547    fn test_config_deprecated_rpc_addr() {
548        let c = SynapConfig::new("http://localhost:15500").with_rpc_addr("10.0.0.1", 15502);
549        assert_eq!(c.rpc_host, "10.0.0.1");
550        assert_eq!(c.rpc_port, 15502);
551    }
552
553    #[test]
554    #[allow(deprecated)]
555    fn test_config_deprecated_resp3_addr() {
556        let c = SynapConfig::new("http://localhost:15500").with_resp3_addr("10.0.0.1", 6380);
557        assert_eq!(c.resp3_host, "10.0.0.1");
558        assert_eq!(c.resp3_port, 6380);
559    }
560
561    #[test]
562    fn test_config_builder() {
563        let config = SynapConfig::new("http://localhost:15500")
564            .with_timeout(Duration::from_secs(10))
565            .with_auth_token("test-token")
566            .with_max_retries(5);
567
568        assert_eq!(config.timeout, Duration::from_secs(10));
569        assert_eq!(config.auth_token, Some("test-token".to_string()));
570        assert_eq!(config.max_retries, 5);
571    }
572
573    #[test]
574    fn test_client_creation_http() {
575        let config = http_config();
576        let client = SynapClient::new(config);
577        assert!(client.is_ok());
578    }
579
580    #[test]
581    fn test_client_creation_synap_rpc() {
582        // SynapRpc transport: creating the client succeeds even when no server
583        // is running — the connection is established lazily.
584        let config = SynapConfig::new("http://localhost:15500");
585        let client = SynapClient::new(config);
586        assert!(client.is_ok());
587    }
588
589    #[test]
590    fn test_client_creation_resp3() {
591        let config = SynapConfig::new("resp3://localhost:6379");
592        let client = SynapClient::new(config);
593        assert!(client.is_ok());
594    }
595
596    #[test]
597    fn test_client_with_auth() {
598        let config = http_config().with_auth_token("secret-token-123");
599        let client = SynapClient::new(config);
600        assert!(client.is_ok());
601    }
602
603    #[test]
604    fn test_client_invalid_url() {
605        // No recognised scheme → falls through to HTTP with the raw string as base_url.
606        // The HTTP client builder rejects non-absolute URLs.
607        let config = SynapConfig::new("not-a-valid-url");
608        let client = SynapClient::new(config);
609        assert!(client.is_err());
610    }
611
612    #[test]
613    fn test_client_relative_url() {
614        let config = SynapConfig::new("/relative/path");
615        let client = SynapClient::new(config);
616        assert!(client.is_err());
617    }
618
619    #[test]
620    fn test_client_kv_interface() {
621        let client = SynapClient::new(http_config()).unwrap();
622        let _kv = client.kv();
623    }
624
625    #[test]
626    fn test_client_queue_interface() {
627        let client = SynapClient::new(http_config()).unwrap();
628        let _queue = client.queue();
629    }
630
631    #[test]
632    fn test_client_transaction_interface() {
633        let client = SynapClient::new(http_config()).unwrap();
634        let _tx = client.transaction();
635    }
636
637    #[test]
638    fn test_client_script_interface() {
639        let client = SynapClient::new(http_config()).unwrap();
640        let _script = client.script();
641    }
642
643    #[test]
644    fn test_client_hyperloglog_interface() {
645        let client = SynapClient::new(http_config()).unwrap();
646        let _hll = client.hyperloglog();
647    }
648
649    #[test]
650    fn test_client_bitmap_interface() {
651        let client = SynapClient::new(http_config()).unwrap();
652        let _bitmap = client.bitmap();
653    }
654
655    #[test]
656    fn test_client_geospatial_interface() {
657        let client = SynapClient::new(http_config()).unwrap();
658        let _geo = client.geospatial();
659    }
660
661    #[test]
662    fn test_client_stream_interface() {
663        let client = SynapClient::new(http_config()).unwrap();
664        let _stream = client.stream();
665    }
666
667    #[test]
668    fn test_client_pubsub_interface() {
669        let client = SynapClient::new(http_config()).unwrap();
670        let _pubsub = client.pubsub();
671    }
672
673    #[test]
674    fn test_client_clone() {
675        let client = SynapClient::new(http_config()).unwrap();
676        let client2 = client.clone();
677        assert!(std::ptr::eq(
678            &*client.config as *const _,
679            &*client2.config as *const _
680        ));
681    }
682
683    #[test]
684    fn test_base_url_getter() {
685        let client = SynapClient::new(http_config()).unwrap();
686        assert_eq!(client.base_url().as_str(), "http://localhost:15500/");
687    }
688
689    #[test]
690    fn test_http_client_getter() {
691        let client = SynapClient::new(http_config()).unwrap();
692        let _http = client.http_client();
693    }
694
695    #[test]
696    fn test_config_with_custom_timeout() {
697        let config = http_config().with_timeout(Duration::from_secs(60));
698        assert_eq!(config.timeout, Duration::from_secs(60));
699    }
700
701    #[test]
702    fn test_config_with_zero_retries() {
703        let config = http_config().with_max_retries(0);
704        assert_eq!(config.max_retries, 0);
705    }
706
707    #[test]
708    fn test_config_clone() {
709        let config = http_config().with_auth_token("token");
710        let config2 = config.clone();
711        assert_eq!(config.base_url, config2.base_url);
712        assert_eq!(config.auth_token, config2.auth_token);
713    }
714
715    #[test]
716    fn test_config_debug_format() {
717        let config = http_config();
718        let debug_str = format!("{:?}", config);
719        assert!(debug_str.contains("SynapConfig"));
720        assert!(debug_str.contains("http://localhost:15500"));
721    }
722}