1use std::sync::Arc;
4use std::time::Duration;
5
6use reqwest::Client;
7use serde_json::Value;
8use url::Url;
9
10use crate::error::{Result, SynapError};
11use crate::transport::{
12 Resp3Transport, SynapRpcTransport, TransportMode, map_command, map_response,
13};
14use crate::{
15 BitmapManager, GeospatialManager, HashManager, HyperLogLogManager, KVStore, ListManager,
16 PubSubManager, QueueManager, ScriptManager, SetManager, SortedSetManager, StreamManager,
17 TransactionManager,
18};
19
20#[derive(Debug, Clone)]
24pub struct SynapConfig {
25 pub base_url: String,
27 pub rpc_host: String,
29 pub rpc_port: u16,
31 pub resp3_host: String,
33 pub resp3_port: u16,
35 pub transport: TransportMode,
37 pub timeout: Duration,
39 pub max_retries: u32,
41 pub auth_token: Option<String>,
43 pub username: Option<String>,
45 pub password: Option<String>,
47}
48
49impl SynapConfig {
50 pub fn new(url: impl Into<String>) -> Self {
71 let raw = url.into();
72
73 if let Some(rest) = raw.strip_prefix("synap://") {
76 let (host, port) = Self::parse_host_port(rest, 15501);
77 return Self {
78 base_url: format!("http://{}:15500", host),
79 rpc_host: host,
80 rpc_port: port,
81 resp3_host: "127.0.0.1".into(),
82 resp3_port: 6379,
83 transport: TransportMode::SynapRpc,
84 timeout: Duration::from_secs(30),
85 max_retries: 3,
86 auth_token: None,
87 username: None,
88 password: None,
89 };
90 }
91
92 if let Some(rest) = raw.strip_prefix("resp3://") {
93 let (host, port) = Self::parse_host_port(rest, 6379);
94 return Self {
95 base_url: format!("http://{}:15500", host),
96 rpc_host: "127.0.0.1".into(),
97 rpc_port: 15501,
98 resp3_host: host,
99 resp3_port: port,
100 transport: TransportMode::Resp3,
101 timeout: Duration::from_secs(30),
102 max_retries: 3,
103 auth_token: None,
104 username: None,
105 password: None,
106 };
107 }
108
109 Self {
111 base_url: raw,
112 rpc_host: "127.0.0.1".into(),
113 rpc_port: 15501,
114 resp3_host: "127.0.0.1".into(),
115 resp3_port: 6379,
116 transport: TransportMode::Http,
117 timeout: Duration::from_secs(30),
118 max_retries: 3,
119 auth_token: None,
120 username: None,
121 password: None,
122 }
123 }
124
125 fn parse_host_port(authority: &str, default_port: u16) -> (String, u16) {
128 let authority = authority.split('/').next().unwrap_or(authority);
130 if let Some(colon) = authority.rfind(':') {
131 let host = authority[..colon].to_string();
132 let port = authority[colon + 1..]
133 .parse::<u16>()
134 .unwrap_or(default_port);
135 (host, port)
136 } else {
137 (authority.to_string(), default_port)
138 }
139 }
140
141 #[deprecated(
146 since = "0.11.0",
147 note = "Pass an `http://` URL to SynapConfig::new instead"
148 )]
149 pub fn with_http_transport(mut self) -> Self {
150 self.transport = TransportMode::Http;
151 self
152 }
153
154 #[deprecated(
160 since = "0.11.0",
161 note = "Pass a `synap://` URL to SynapConfig::new instead"
162 )]
163 pub fn with_synap_rpc_transport(mut self) -> Self {
164 self.transport = TransportMode::SynapRpc;
165 self
166 }
167
168 #[deprecated(
173 since = "0.11.0",
174 note = "Pass a `resp3://` URL to SynapConfig::new instead"
175 )]
176 pub fn with_resp3_transport(mut self) -> Self {
177 self.transport = TransportMode::Resp3;
178 self
179 }
180
181 #[deprecated(
186 since = "0.11.0",
187 note = "Encode host and port in the `synap://` URL passed to SynapConfig::new"
188 )]
189 pub fn with_rpc_addr(mut self, host: impl Into<String>, port: u16) -> Self {
190 self.rpc_host = host.into();
191 self.rpc_port = port;
192 self
193 }
194
195 #[deprecated(
200 since = "0.11.0",
201 note = "Encode host and port in the `resp3://` URL passed to SynapConfig::new"
202 )]
203 pub fn with_resp3_addr(mut self, host: impl Into<String>, port: u16) -> Self {
204 self.resp3_host = host.into();
205 self.resp3_port = port;
206 self
207 }
208
209 pub fn with_timeout(mut self, timeout: Duration) -> Self {
211 self.timeout = timeout;
212 self
213 }
214
215 pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
217 self.auth_token = Some(token.into());
218 self.username = None;
219 self.password = None;
220 self
221 }
222
223 pub fn with_basic_auth(
225 mut self,
226 username: impl Into<String>,
227 password: impl Into<String>,
228 ) -> Self {
229 self.username = Some(username.into());
230 self.password = Some(password.into());
231 self.auth_token = None;
232 self
233 }
234
235 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
237 self.max_retries = max_retries;
238 self
239 }
240}
241
242enum Transport {
245 Http,
246 SynapRpc(Arc<SynapRpcTransport>),
247 Resp3(Arc<Resp3Transport>),
248}
249
250#[derive(Clone)]
259pub struct SynapClient {
260 #[allow(dead_code)]
261 config: Arc<SynapConfig>,
262 http_client: Client,
263 base_url: Url,
264 transport: Arc<Transport>,
265}
266
267impl SynapClient {
268 pub fn new(config: SynapConfig) -> Result<Self> {
270 let base_url = Url::parse(&config.base_url)?;
271
272 let mut builder = Client::builder().timeout(config.timeout);
274
275 if let Some(ref token) = config.auth_token {
276 let mut headers = reqwest::header::HeaderMap::new();
277 headers.insert(
278 reqwest::header::AUTHORIZATION,
279 format!("Bearer {}", token).parse().unwrap(),
280 );
281 builder = builder.default_headers(headers);
282 } else if let (Some(username), Some(password)) = (&config.username, &config.password) {
283 use base64::Engine;
284 let encoded = base64::engine::general_purpose::STANDARD
285 .encode(format!("{}:{}", username, password));
286 let mut headers = reqwest::header::HeaderMap::new();
287 headers.insert(
288 reqwest::header::AUTHORIZATION,
289 format!("Basic {}", encoded).parse().unwrap(),
290 );
291 builder = builder.default_headers(headers);
292 }
293
294 let http_client = builder.build()?;
295
296 let transport = match config.transport {
297 TransportMode::Http => Arc::new(Transport::Http),
298 TransportMode::SynapRpc => Arc::new(Transport::SynapRpc(Arc::new(
299 SynapRpcTransport::new(&config.rpc_host, config.rpc_port, config.timeout),
300 ))),
301 TransportMode::Resp3 => Arc::new(Transport::Resp3(Arc::new(Resp3Transport::new(
302 &config.resp3_host,
303 config.resp3_port,
304 config.timeout,
305 )))),
306 };
307
308 Ok(Self {
309 config: Arc::new(config),
310 http_client,
311 base_url,
312 transport,
313 })
314 }
315
316 pub fn kv(&self) -> KVStore {
320 KVStore::new(self.clone())
321 }
322
323 pub fn hash(&self) -> HashManager {
325 HashManager::new(self.clone())
326 }
327
328 pub fn list(&self) -> ListManager {
330 ListManager::new(self.clone())
331 }
332
333 pub fn set(&self) -> SetManager {
335 SetManager::new(self.clone())
336 }
337
338 pub fn sorted_set(&self) -> SortedSetManager {
340 SortedSetManager::new(self.clone())
341 }
342
343 pub fn queue(&self) -> QueueManager {
345 QueueManager::new(self.clone())
346 }
347
348 pub fn stream(&self) -> StreamManager {
350 StreamManager::new(self.clone())
351 }
352
353 pub fn pubsub(&self) -> PubSubManager {
355 PubSubManager::new(self.clone())
356 }
357
358 pub fn transaction(&self) -> TransactionManager {
360 TransactionManager::new(self.clone())
361 }
362
363 pub fn script(&self) -> ScriptManager {
365 ScriptManager::new(self.clone())
366 }
367
368 pub fn hyperloglog(&self) -> HyperLogLogManager {
370 HyperLogLogManager::new(self.clone())
371 }
372
373 pub fn bitmap(&self) -> BitmapManager {
375 BitmapManager::new(self.clone())
376 }
377
378 pub fn geospatial(&self) -> GeospatialManager {
380 GeospatialManager::new(self.clone())
381 }
382
383 pub async fn send_command(&self, command: &str, payload: Value) -> Result<Value> {
393 match self.transport.as_ref() {
394 Transport::Http => self.send_http(command, payload).await,
395
396 Transport::SynapRpc(rpc) => match map_command(command, &payload) {
397 Some((raw_cmd, args)) => {
398 let wire = rpc.execute(raw_cmd, args).await?;
399 Ok(map_response(command, wire))
400 }
401 None => Err(SynapError::UnsupportedCommand {
402 command: command.to_owned(),
403 transport: "SynapRpc".to_owned(),
404 }),
405 },
406
407 Transport::Resp3(resp3) => match map_command(command, &payload) {
408 Some((raw_cmd, args)) => {
409 let wire = resp3.execute(raw_cmd, args).await?;
410 Ok(map_response(command, wire))
411 }
412 None => Err(SynapError::UnsupportedCommand {
413 command: command.to_owned(),
414 transport: "Resp3".to_owned(),
415 }),
416 },
417 }
418 }
419
420 async fn send_http(&self, command: &str, payload: Value) -> Result<Value> {
422 let request_id = uuid::Uuid::new_v4().to_string();
423
424 let body = serde_json::json!({
425 "command": command,
426 "request_id": request_id,
427 "payload": payload,
428 });
429
430 let url = self
431 .base_url
432 .join("api/v1/command")
433 .map_err(SynapError::InvalidUrl)?;
434
435 let response = self.http_client.post(url).json(&body).send().await?;
436
437 if !response.status().is_success() {
438 let error_text = response.text().await.unwrap_or_default();
439 return Err(SynapError::ServerError(error_text));
440 }
441
442 let result: Value = response.json().await?;
443
444 if !result["success"].as_bool().unwrap_or(false) {
445 let error_msg = result["error"]
446 .as_str()
447 .unwrap_or("Unknown error")
448 .to_string();
449 return Err(SynapError::ServerError(error_msg));
450 }
451
452 Ok(result["payload"].clone())
453 }
454
455 #[allow(dead_code)]
459 pub fn base_url(&self) -> &Url {
460 &self.base_url
461 }
462
463 #[allow(dead_code)]
465 pub(crate) fn http_client(&self) -> &Client {
466 &self.http_client
467 }
468
469 pub(crate) fn synap_rpc_transport(&self) -> Option<Arc<SynapRpcTransport>> {
472 match self.transport.as_ref() {
473 Transport::SynapRpc(rpc) => Some(Arc::clone(rpc)),
474 _ => None,
475 }
476 }
477}
478
479#[cfg(test)]
482mod tests {
483 use super::*;
484
485 fn http_config() -> SynapConfig {
486 SynapConfig::new("http://localhost:15500")
487 }
488
489 #[test]
490 fn test_config_creation_http_url_infers_http_transport() {
491 let config = SynapConfig::new("http://localhost:15500");
492 assert_eq!(config.base_url, "http://localhost:15500");
493 assert_eq!(config.timeout, Duration::from_secs(30));
494 assert_eq!(config.max_retries, 3);
495 assert!(config.auth_token.is_none());
496 assert!(matches!(config.transport, TransportMode::Http));
498 }
499
500 #[test]
501 fn test_config_synap_url_infers_rpc_transport() {
502 let c = SynapConfig::new("synap://localhost:15501");
503 assert!(matches!(c.transport, TransportMode::SynapRpc));
504 assert_eq!(c.rpc_host, "localhost");
505 assert_eq!(c.rpc_port, 15501);
506 }
507
508 #[test]
509 fn test_config_resp3_url_infers_resp3_transport() {
510 let c = SynapConfig::new("resp3://localhost:6379");
511 assert!(matches!(c.transport, TransportMode::Resp3));
512 assert_eq!(c.resp3_host, "localhost");
513 assert_eq!(c.resp3_port, 6379);
514 }
515
516 #[test]
517 fn test_config_synap_url_default_port() {
518 let c = SynapConfig::new("synap://myhost");
519 assert!(matches!(c.transport, TransportMode::SynapRpc));
520 assert_eq!(c.rpc_host, "myhost");
521 assert_eq!(c.rpc_port, 15501); }
523
524 #[test]
525 fn test_config_resp3_url_default_port() {
526 let c = SynapConfig::new("resp3://myhost");
527 assert!(matches!(c.transport, TransportMode::Resp3));
528 assert_eq!(c.resp3_host, "myhost");
529 assert_eq!(c.resp3_port, 6379); }
531
532 #[test]
533 #[allow(deprecated)]
534 fn test_config_deprecated_transport_selection() {
535 let c = SynapConfig::new("http://localhost:15500").with_http_transport();
536 assert!(matches!(c.transport, TransportMode::Http));
537
538 let c = SynapConfig::new("http://localhost:15500").with_resp3_transport();
539 assert!(matches!(c.transport, TransportMode::Resp3));
540
541 let c = SynapConfig::new("http://localhost:15500").with_synap_rpc_transport();
542 assert!(matches!(c.transport, TransportMode::SynapRpc));
543 }
544
545 #[test]
546 #[allow(deprecated)]
547 fn test_config_deprecated_rpc_addr() {
548 let c = SynapConfig::new("http://localhost:15500").with_rpc_addr("10.0.0.1", 15502);
549 assert_eq!(c.rpc_host, "10.0.0.1");
550 assert_eq!(c.rpc_port, 15502);
551 }
552
553 #[test]
554 #[allow(deprecated)]
555 fn test_config_deprecated_resp3_addr() {
556 let c = SynapConfig::new("http://localhost:15500").with_resp3_addr("10.0.0.1", 6380);
557 assert_eq!(c.resp3_host, "10.0.0.1");
558 assert_eq!(c.resp3_port, 6380);
559 }
560
561 #[test]
562 fn test_config_builder() {
563 let config = SynapConfig::new("http://localhost:15500")
564 .with_timeout(Duration::from_secs(10))
565 .with_auth_token("test-token")
566 .with_max_retries(5);
567
568 assert_eq!(config.timeout, Duration::from_secs(10));
569 assert_eq!(config.auth_token, Some("test-token".to_string()));
570 assert_eq!(config.max_retries, 5);
571 }
572
573 #[test]
574 fn test_client_creation_http() {
575 let config = http_config();
576 let client = SynapClient::new(config);
577 assert!(client.is_ok());
578 }
579
580 #[test]
581 fn test_client_creation_synap_rpc() {
582 let config = SynapConfig::new("http://localhost:15500");
585 let client = SynapClient::new(config);
586 assert!(client.is_ok());
587 }
588
589 #[test]
590 fn test_client_creation_resp3() {
591 let config = SynapConfig::new("resp3://localhost:6379");
592 let client = SynapClient::new(config);
593 assert!(client.is_ok());
594 }
595
596 #[test]
597 fn test_client_with_auth() {
598 let config = http_config().with_auth_token("secret-token-123");
599 let client = SynapClient::new(config);
600 assert!(client.is_ok());
601 }
602
603 #[test]
604 fn test_client_invalid_url() {
605 let config = SynapConfig::new("not-a-valid-url");
608 let client = SynapClient::new(config);
609 assert!(client.is_err());
610 }
611
612 #[test]
613 fn test_client_relative_url() {
614 let config = SynapConfig::new("/relative/path");
615 let client = SynapClient::new(config);
616 assert!(client.is_err());
617 }
618
619 #[test]
620 fn test_client_kv_interface() {
621 let client = SynapClient::new(http_config()).unwrap();
622 let _kv = client.kv();
623 }
624
625 #[test]
626 fn test_client_queue_interface() {
627 let client = SynapClient::new(http_config()).unwrap();
628 let _queue = client.queue();
629 }
630
631 #[test]
632 fn test_client_transaction_interface() {
633 let client = SynapClient::new(http_config()).unwrap();
634 let _tx = client.transaction();
635 }
636
637 #[test]
638 fn test_client_script_interface() {
639 let client = SynapClient::new(http_config()).unwrap();
640 let _script = client.script();
641 }
642
643 #[test]
644 fn test_client_hyperloglog_interface() {
645 let client = SynapClient::new(http_config()).unwrap();
646 let _hll = client.hyperloglog();
647 }
648
649 #[test]
650 fn test_client_bitmap_interface() {
651 let client = SynapClient::new(http_config()).unwrap();
652 let _bitmap = client.bitmap();
653 }
654
655 #[test]
656 fn test_client_geospatial_interface() {
657 let client = SynapClient::new(http_config()).unwrap();
658 let _geo = client.geospatial();
659 }
660
661 #[test]
662 fn test_client_stream_interface() {
663 let client = SynapClient::new(http_config()).unwrap();
664 let _stream = client.stream();
665 }
666
667 #[test]
668 fn test_client_pubsub_interface() {
669 let client = SynapClient::new(http_config()).unwrap();
670 let _pubsub = client.pubsub();
671 }
672
673 #[test]
674 fn test_client_clone() {
675 let client = SynapClient::new(http_config()).unwrap();
676 let client2 = client.clone();
677 assert!(std::ptr::eq(
678 &*client.config as *const _,
679 &*client2.config as *const _
680 ));
681 }
682
683 #[test]
684 fn test_base_url_getter() {
685 let client = SynapClient::new(http_config()).unwrap();
686 assert_eq!(client.base_url().as_str(), "http://localhost:15500/");
687 }
688
689 #[test]
690 fn test_http_client_getter() {
691 let client = SynapClient::new(http_config()).unwrap();
692 let _http = client.http_client();
693 }
694
695 #[test]
696 fn test_config_with_custom_timeout() {
697 let config = http_config().with_timeout(Duration::from_secs(60));
698 assert_eq!(config.timeout, Duration::from_secs(60));
699 }
700
701 #[test]
702 fn test_config_with_zero_retries() {
703 let config = http_config().with_max_retries(0);
704 assert_eq!(config.max_retries, 0);
705 }
706
707 #[test]
708 fn test_config_clone() {
709 let config = http_config().with_auth_token("token");
710 let config2 = config.clone();
711 assert_eq!(config.base_url, config2.base_url);
712 assert_eq!(config.auth_token, config2.auth_token);
713 }
714
715 #[test]
716 fn test_config_debug_format() {
717 let config = http_config();
718 let debug_str = format!("{:?}", config);
719 assert!(debug_str.contains("SynapConfig"));
720 assert!(debug_str.contains("http://localhost:15500"));
721 }
722}