1use anyhow::{Context, Result};
4use async_trait::async_trait;
5use bytes::Bytes;
6use std::{
7 path::PathBuf,
8 sync::{
9 Arc,
10 atomic::{AtomicBool, Ordering},
11 },
12};
13use synapse_primitives::{InstanceId, Uuid};
14use synapse_proto::{
15 HttpEndpointRegister, HttpMethod, HttpRoute, InstanceCapabilities, InterfaceRegister,
16 MessageKind, RpcRequest, RpcResponse, SynapseMessage, synapse_message,
17};
18use synapse_rpc::{
19 HttpRpcClient, InterfaceRegistration, PROTOCOL_VERSION, RpcHandler, RpcServer,
20 extract_http_registration_ack, extract_registration_ack,
21};
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24
25#[derive(Clone)]
27pub struct MtlsClientConfig {
28 pub cert_path: PathBuf,
29 pub key_path: PathBuf,
30 pub ca_cert_path: PathBuf,
31}
32
33pub struct ServiceBuilder {
35 service_name: String,
36 instance_id: InstanceId,
37 http_endpoint: Option<String>,
38 gateway_url: Option<String>,
39 mtls_config: Option<MtlsClientConfig>,
40}
41
42impl ServiceBuilder {
43 pub fn new(service_name: impl Into<String>) -> Self {
45 Self {
46 service_name: service_name.into(),
47 instance_id: InstanceId::new_random(),
48 http_endpoint: None,
49 gateway_url: None,
50 mtls_config: None,
51 }
52 }
53
54 pub fn with_instance_id(mut self, instance_id: InstanceId) -> Self {
56 self.instance_id = instance_id;
57 self
58 }
59
60 pub fn with_http_endpoint(mut self, http_endpoint: impl Into<String>) -> Self {
62 self.http_endpoint = Some(http_endpoint.into());
63 self
64 }
65
66 pub fn with_gateway(mut self, gateway_url: impl Into<String>) -> Self {
68 self.gateway_url = Some(gateway_url.into());
69 self
70 }
71
72 pub fn with_gateway_mtls(
74 mut self,
75 gateway_url: impl Into<String>,
76 cert_path: impl Into<PathBuf>,
77 key_path: impl Into<PathBuf>,
78 ca_cert_path: impl Into<PathBuf>,
79 ) -> Self {
80 self.gateway_url = Some(gateway_url.into());
81 self.mtls_config = Some(MtlsClientConfig {
82 cert_path: cert_path.into(),
83 key_path: key_path.into(),
84 ca_cert_path: ca_cert_path.into(),
85 });
86 self
87 }
88
89 pub fn build(self) -> Service {
91 let gateway_client = match (&self.gateway_url, &self.mtls_config) {
92 (Some(url), Some(mtls)) => {
93 match HttpRpcClient::json_mtls(
94 url,
95 &mtls.cert_path,
96 &mtls.key_path,
97 &mtls.ca_cert_path,
98 ) {
99 Ok(client) => Some(client),
100 Err(e) => {
101 error!("Failed to create mTLS gateway client: {}", e);
102 None
103 }
104 }
105 }
106 (Some(url), None) => Some(HttpRpcClient::json(url)),
107 _ => None,
108 };
109
110 Service {
111 service_name: self.service_name,
112 instance_id: self.instance_id,
113 http_endpoint: self.http_endpoint,
114 rpc_server: Arc::new(RpcServer::new()),
115 gateway_client,
116 health_check: None,
117 start_time: std::time::Instant::now(),
118 registered_interfaces: RwLock::new(Vec::new()),
119 registered_http_routes: RwLock::new(Vec::new()),
120 registered_http_route_groups: RwLock::new(Vec::new()),
121 gateway_connected: AtomicBool::new(false),
122 }
123 }
124}
125
126pub type HealthCheckFn = Arc<dyn Fn() -> HealthStatus + Send + Sync>;
128
129#[derive(Debug, Clone)]
131pub struct HealthStatus {
132 pub status: synapse_proto::HealthStatus,
133 pub message: Option<String>,
134}
135
136impl HealthStatus {
137 pub fn healthy() -> Self {
138 Self {
139 status: synapse_proto::HealthStatus::Healthy,
140 message: None,
141 }
142 }
143
144 pub fn degraded(message: impl Into<String>) -> Self {
145 Self {
146 status: synapse_proto::HealthStatus::Degraded,
147 message: Some(message.into()),
148 }
149 }
150
151 pub fn unhealthy(message: impl Into<String>) -> Self {
152 Self {
153 status: synapse_proto::HealthStatus::Unhealthy,
154 message: Some(message.into()),
155 }
156 }
157
158 pub fn draining(message: impl Into<String>) -> Self {
159 Self {
160 status: synapse_proto::HealthStatus::Draining,
161 message: Some(message.into()),
162 }
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
168pub enum RouteResponseType {
169 #[default]
171 Json,
172 Html,
174}
175
176#[derive(Debug, Clone)]
178pub struct HttpRouteConfig {
179 pub path: String,
181 pub methods: Vec<HttpMethod>,
183 pub subdomain: String,
185 pub middleware: Vec<String>,
187 pub metrics: bool,
189 pub response_type: RouteResponseType,
191}
192
193impl HttpRouteConfig {
194 pub fn new(path: impl Into<String>) -> Self {
196 Self {
197 path: path.into(),
198 methods: vec![],
199 subdomain: String::new(),
200 middleware: vec![],
201 metrics: false,
202 response_type: RouteResponseType::Json,
203 }
204 }
205
206 pub fn with_methods(mut self, methods: Vec<HttpMethod>) -> Self {
208 self.methods = methods;
209 self
210 }
211
212 pub fn with_subdomain(mut self, subdomain: impl Into<String>) -> Self {
214 self.subdomain = subdomain.into();
215 self
216 }
217
218 pub fn with_middleware(mut self, middleware: Vec<String>) -> Self {
220 self.middleware = middleware;
221 self
222 }
223
224 pub fn as_metrics(mut self) -> Self {
226 self.metrics = true;
227 self
228 }
229
230 pub fn as_html(mut self) -> Self {
232 self.response_type = RouteResponseType::Html;
233 self
234 }
235}
236
237#[derive(Debug, Clone)]
245pub struct HttpRouteGroup {
246 pub service_name: String,
248 pub host: String,
250 pub port: u32,
252 pub routes: Vec<HttpRouteConfig>,
254}
255
256impl HttpRouteGroup {
257 pub fn new(
258 service_name: impl Into<String>,
259 host: impl Into<String>,
260 port: u32,
261 routes: Vec<HttpRouteConfig>,
262 ) -> Self {
263 Self {
264 service_name: service_name.into(),
265 host: host.into(),
266 port,
267 routes,
268 }
269 }
270}
271
272#[derive(Clone)]
274struct RegisteredInterface {
275 registration: InterfaceRegistration,
276 #[allow(dead_code)]
277 handler: Arc<dyn RpcHandler>,
278}
279
280pub struct Service {
282 service_name: String,
283 instance_id: InstanceId,
284 http_endpoint: Option<String>,
285 rpc_server: Arc<RpcServer>,
286 gateway_client: Option<HttpRpcClient>,
287 health_check: Option<HealthCheckFn>,
288 start_time: std::time::Instant,
289 registered_interfaces: RwLock<Vec<RegisteredInterface>>,
291 registered_http_routes: RwLock<Vec<HttpRouteConfig>>,
293 registered_http_route_groups: RwLock<Vec<HttpRouteGroup>>,
295 gateway_connected: AtomicBool,
297}
298
299impl Service {
300 pub fn builder(service_name: impl Into<String>) -> ServiceBuilder {
302 ServiceBuilder::new(service_name)
303 }
304
305 pub fn name(&self) -> &str {
307 &self.service_name
308 }
309
310 pub fn instance_id(&self) -> InstanceId {
312 self.instance_id
313 }
314
315 pub fn rpc_server(&self) -> &Arc<RpcServer> {
317 &self.rpc_server
318 }
319
320 pub async fn register(
335 &self,
336 mut registration: InterfaceRegistration,
337 handler: Arc<dyn RpcHandler>,
338 ) -> Result<()> {
339 registration.instance_id = self.instance_id;
341 registration.service_name = self.service_name.clone();
342
343 self.register_inner(registration, handler).await
344 }
345
346 pub async fn register_interface(
351 &self,
352 interface_name: &str,
353 method_names: &[&str],
354 handler: Arc<dyn RpcHandler>,
355 ) -> Result<()> {
356 let registration = InterfaceRegistration::new(
357 interface_name,
358 &self.service_name,
359 method_names,
360 self.instance_id,
361 );
362
363 self.register_inner(registration, handler).await
364 }
365
366 async fn register_inner(
368 &self,
369 registration: InterfaceRegistration,
370 handler: Arc<dyn RpcHandler>,
371 ) -> Result<()> {
372 info!(
373 service = %self.service_name,
374 interface = %registration.interface_name,
375 interface_id = ?registration.interface_id,
376 "Registering interface locally"
377 );
378
379 self.rpc_server
381 .registry()
382 .register(registration.clone(), handler.clone())
383 .await
384 .map_err(|e| anyhow::anyhow!("Failed to register interface locally: {}", e))?;
385
386 {
388 let mut interfaces = self.registered_interfaces.write().await;
389 interfaces.push(RegisteredInterface {
390 registration: registration.clone(),
391 handler,
392 });
393 }
394
395 if let Some(ref client) = self.gateway_client {
397 match self.register_with_gateway(client, ®istration).await {
398 Ok(()) => {
399 self.gateway_connected.store(true, Ordering::SeqCst);
400 }
401 Err(e) => {
402 self.gateway_connected.store(false, Ordering::SeqCst);
403 warn!(
405 service = %self.service_name,
406 error = %e,
407 "Failed to register with gateway (will retry)"
408 );
409 }
410 }
411 }
412
413 Ok(())
414 }
415
416 pub async fn register_http_routes(&self, routes: Vec<HttpRouteConfig>) -> Result<()> {
425 let Some(ref client) = self.gateway_client else {
426 return Err(anyhow::anyhow!(
427 "Cannot register HTTP routes: no gateway client configured"
428 ));
429 };
430
431 {
433 let mut stored = self.registered_http_routes.write().await;
434 stored.extend(routes.iter().cloned());
435 }
436
437 let endpoint = self.http_endpoint.clone().unwrap_or_default();
438 let (host, port) = parse_host_port(&endpoint)?;
439 self.send_http_registration(client, &routes, &host, port, &self.service_name, false)
440 .await
441 }
442
443 pub async fn register_http_route_group(&self, group: HttpRouteGroup) -> Result<()> {
454 let Some(ref client) = self.gateway_client else {
455 return Err(anyhow::anyhow!(
456 "Cannot register HTTP routes: no gateway client configured"
457 ));
458 };
459
460 {
462 let mut stored = self.registered_http_route_groups.write().await;
463 stored.push(group.clone());
464 }
465
466 self.send_http_registration(
467 client,
468 &group.routes,
469 &group.host,
470 group.port,
471 &group.service_name,
472 true,
473 )
474 .await
475 }
476
477 async fn send_http_registration(
479 &self,
480 client: &HttpRpcClient,
481 routes: &[HttpRouteConfig],
482 host: &str,
483 port: u32,
484 service_name: &str,
485 delegated: bool,
486 ) -> Result<()> {
487 let proto_routes: Vec<HttpRoute> = routes
488 .iter()
489 .map(|r| HttpRoute {
490 path_prefix: r.path.clone(),
491 methods: r.methods.iter().map(|m| *m as i32).collect(),
492 options: None,
493 subdomain: r.subdomain.clone(),
494 middleware: r.middleware.clone(),
495 metrics: r.metrics,
496 response_type: match r.response_type {
497 RouteResponseType::Json => synapse_proto::ResponseType::Json as i32,
498 RouteResponseType::Html => synapse_proto::ResponseType::Html as i32,
499 },
500 })
501 .collect();
502
503 let register = HttpEndpointRegister {
504 instance_id: Bytes::copy_from_slice(&self.instance_id.as_bytes()),
505 routes: proto_routes,
506 host: host.to_string(),
507 port,
508 health_check_path: String::new(),
509 health_check_interval_ms: 0,
510 service_name: service_name.to_string(),
511 delegated,
512 };
513
514 let request_id = Uuid::new_v4();
515 let message = SynapseMessage {
516 protocol_version: PROTOCOL_VERSION.as_u32(),
517 kind: MessageKind::HttpEndpointRegister as i32,
518 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
519 message: Some(synapse_message::Message::HttpEndpointRegister(register)),
520 };
521
522 let response = client
523 .send(&message)
524 .await
525 .context("Failed to send HTTP registration to gateway")?;
526
527 if let Some((_, ack)) = extract_http_registration_ack(response) {
528 if ack.success {
529 info!(
530 service = %service_name,
531 host = %host,
532 port = %port,
533 routes = routes.len(),
534 "HTTP routes registered with gateway"
535 );
536 Ok(())
537 } else {
538 error!(
539 service = %service_name,
540 error = %ack.error_message,
541 "Gateway rejected HTTP route registration"
542 );
543 Err(anyhow::anyhow!(
544 "Gateway rejected HTTP route registration: {}",
545 ack.error_message
546 ))
547 }
548 } else {
549 Err(anyhow::anyhow!(
550 "Unexpected response from gateway during HTTP registration"
551 ))
552 }
553 }
554
555 async fn register_with_gateway(
557 &self,
558 client: &HttpRpcClient,
559 registration: &InterfaceRegistration,
560 ) -> Result<()> {
561 let http_endpoint = self.http_endpoint.clone().unwrap_or_default();
562
563 info!(
564 service = %self.service_name,
565 interface = %registration.interface_name,
566 endpoint = %http_endpoint,
567 "Registering interface with gateway"
568 );
569
570 let register = InterfaceRegister {
572 interface_id: registration.interface_id.into(),
573 interface_version: registration.interface_version,
574 method_ids: registration
575 .method_ids
576 .iter()
577 .map(|m| (*m).into())
578 .collect(),
579 instance_id: Bytes::copy_from_slice(&self.instance_id.as_bytes()),
580 capabilities: Some(InstanceCapabilities::default()),
581 service_name: self.service_name.clone(),
582 interface_name: registration.interface_name.clone(),
583 http_endpoint,
584 method_names: registration.method_names.clone(),
585 };
586
587 let request_id = Uuid::new_v4();
589 let message = SynapseMessage {
590 protocol_version: PROTOCOL_VERSION.as_u32(),
591 kind: MessageKind::InterfaceRegister as i32,
592 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
593 message: Some(synapse_message::Message::InterfaceRegister(register)),
594 };
595
596 let response = client
598 .send(&message)
599 .await
600 .context("Failed to send registration to gateway")?;
601
602 if let Some((_, ack)) = extract_registration_ack(response) {
604 if ack.success {
605 info!(
606 service = %self.service_name,
607 interface = %registration.interface_name,
608 "Successfully registered with gateway"
609 );
610 Ok(())
611 } else {
612 error!(
613 service = %self.service_name,
614 interface = %registration.interface_name,
615 error = %ack.error_message,
616 "Gateway rejected registration"
617 );
618 Err(anyhow::anyhow!(
619 "Gateway rejected registration: {}",
620 ack.error_message
621 ))
622 }
623 } else {
624 warn!(
625 service = %self.service_name,
626 "Unexpected response from gateway during registration"
627 );
628 Err(anyhow::anyhow!("Unexpected response from gateway"))
629 }
630 }
631
632 pub async fn handle_request(&self, request: RpcRequest) -> RpcResponse {
634 self.rpc_server.handle_request(request).await
635 }
636
637 pub fn set_health_check<F>(&mut self, check: F)
639 where
640 F: Fn() -> HealthStatus + Send + Sync + 'static,
641 {
642 self.health_check = Some(Arc::new(check));
643 }
644
645 fn get_health_status(&self) -> HealthStatus {
647 if let Some(ref health_check) = self.health_check {
648 health_check()
649 } else {
650 HealthStatus::healthy()
651 }
652 }
653
654 pub fn health_response(&self) -> synapse_proto::HealthResponse {
656 let health = self.get_health_status();
657 let uptime_ms = self.start_time.elapsed().as_millis() as i64;
658
659 synapse_proto::HealthResponse {
660 instance_id: Bytes::copy_from_slice(&self.instance_id.as_bytes()),
661 status: health.status as i32,
662 version: env!("CARGO_PKG_VERSION").to_string(),
663 uptime_ms,
664 message: health.message.unwrap_or_default(),
665 }
666 }
667
668 pub fn is_gateway_connected(&self) -> bool {
670 self.gateway_connected.load(Ordering::SeqCst)
671 }
672
673 async fn reregister_all(&self) -> Result<()> {
675 let Some(ref client) = self.gateway_client else {
676 return Ok(());
677 };
678
679 let interfaces = self.registered_interfaces.read().await;
681 if !interfaces.is_empty() {
682 info!(
683 service = %self.service_name,
684 count = interfaces.len(),
685 "Re-registering interfaces with gateway"
686 );
687
688 for iface in interfaces.iter() {
689 match self
690 .register_with_gateway(client, &iface.registration)
691 .await
692 {
693 Ok(()) => {
694 info!(
695 service = %self.service_name,
696 interface = %iface.registration.interface_name,
697 "Re-registered interface"
698 );
699 }
700 Err(e) => {
701 error!(
702 service = %self.service_name,
703 interface = %iface.registration.interface_name,
704 error = %e,
705 "Failed to re-register interface"
706 );
707 return Err(e);
708 }
709 }
710 }
711 }
712 drop(interfaces);
713
714 let http_routes = self.registered_http_routes.read().await;
716 if !http_routes.is_empty() {
717 info!(
718 service = %self.service_name,
719 count = http_routes.len(),
720 "Re-registering HTTP routes with gateway"
721 );
722 let routes: Vec<HttpRouteConfig> = http_routes.clone();
723 drop(http_routes);
724
725 let endpoint = self.http_endpoint.clone().unwrap_or_default();
726 let (host, port) = parse_host_port(&endpoint)?;
727 if let Err(e) = self
728 .send_http_registration(client, &routes, &host, port, &self.service_name, false)
729 .await
730 {
731 error!(
732 service = %self.service_name,
733 error = %e,
734 "Failed to re-register HTTP routes"
735 );
736 return Err(e);
737 }
738 } else {
739 drop(http_routes);
740 }
741
742 let route_groups = self.registered_http_route_groups.read().await;
744 if !route_groups.is_empty() {
745 let groups: Vec<HttpRouteGroup> = route_groups.clone();
746 drop(route_groups);
747
748 for group in &groups {
749 info!(
750 service = %self.service_name,
751 target = %format!("{}:{}", group.host, group.port),
752 count = group.routes.len(),
753 "Re-registering delegated HTTP service with gateway"
754 );
755 if let Err(e) = self
756 .send_http_registration(
757 client,
758 &group.routes,
759 &group.host,
760 group.port,
761 &group.service_name,
762 true,
763 )
764 .await
765 {
766 error!(
767 service = %self.service_name,
768 error = %e,
769 "Failed to re-register HTTP route group"
770 );
771 return Err(e);
772 }
773 }
774 }
775
776 Ok(())
777 }
778
779 pub fn start_gateway_connection_task(
789 self: Arc<Self>,
790 interval_ms: u64,
791 ) -> tokio::task::JoinHandle<()> {
792 self.start_gateway_connection_task_with_delay(interval_ms, 500)
793 }
794
795 pub fn start_gateway_connection_task_with_delay(
797 self: Arc<Self>,
798 interval_ms: u64,
799 initial_delay_ms: u64,
800 ) -> tokio::task::JoinHandle<()> {
801 tokio::spawn(async move {
802 if initial_delay_ms > 0 {
803 tokio::time::sleep(std::time::Duration::from_millis(initial_delay_ms)).await;
804 }
805
806 let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
807 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
808 let mut consecutive_failures = 0u32;
809
810 loop {
811 interval.tick().await;
812
813 if let Some(ref client) = self.gateway_client {
814 let health = self.health_response();
815 let request_id = Uuid::new_v4();
816
817 let message = SynapseMessage {
819 protocol_version: PROTOCOL_VERSION.as_u32(),
820 kind: MessageKind::HealthResponse as i32,
821 request_id: Bytes::copy_from_slice(request_id.as_bytes()),
822 message: Some(synapse_message::Message::HealthResponse(health.clone())),
823 };
824
825 match client.send(&message).await {
826 Ok(response) => {
827 let needs_reregister =
829 if let Some((_, ack)) = extract_registration_ack(response) {
830 !ack.success
831 && ack.error_message.contains("INSTANCE_NOT_REGISTERED")
832 } else {
833 false
834 };
835
836 if needs_reregister {
837 info!(
839 service = %self.service_name,
840 "Gateway requested re-registration, re-registering interfaces"
841 );
842
843 if let Err(e) = self.reregister_all().await {
844 error!(
845 service = %self.service_name,
846 error = %e,
847 "Failed to re-register interfaces"
848 );
849 self.gateway_connected.store(false, Ordering::SeqCst);
850 } else {
851 info!(
852 service = %self.service_name,
853 "Successfully re-registered with gateway"
854 );
855 self.gateway_connected.store(true, Ordering::SeqCst);
856 }
857 } else {
858 self.gateway_connected.store(true, Ordering::SeqCst);
860 consecutive_failures = 0;
861 debug!(
862 service = %self.service_name,
863 status = health.status,
864 uptime_ms = health.uptime_ms,
865 "Health status pushed to gateway"
866 );
867 }
868 }
869 Err(e) => {
870 let was_connected =
871 self.gateway_connected.swap(false, Ordering::SeqCst);
872 consecutive_failures += 1;
873
874 if was_connected {
875 warn!(
876 service = %self.service_name,
877 error = %e,
878 "Lost connection to gateway, will attempt to reconnect"
879 );
880 } else if consecutive_failures % 10 == 1 {
881 debug!(
883 service = %self.service_name,
884 failures = consecutive_failures,
885 "Gateway still unavailable, retrying..."
886 );
887 }
888 }
889 }
890 } else {
891 let health = self.get_health_status();
893 debug!(
894 service = %self.service_name,
895 status = ?health.status,
896 "Health check (no gateway configured)"
897 );
898 }
899 }
900 })
901 }
902
903 pub fn start_health_pusher(self: Arc<Self>, interval_ms: u64) -> tokio::task::JoinHandle<()> {
905 self.start_gateway_connection_task(interval_ms)
906 }
907
908 pub fn gateway_client(&self) -> Option<&HttpRpcClient> {
910 self.gateway_client.as_ref()
911 }
912}
913
914fn parse_host_port(endpoint: &str) -> Result<(String, u32)> {
916 let stripped = endpoint
917 .strip_prefix("http://")
918 .or_else(|| endpoint.strip_prefix("https://"))
919 .context("Endpoint must start with http:// or https://")?;
920
921 let authority = stripped.split('/').next().unwrap_or(stripped);
923
924 if authority.is_empty() {
925 anyhow::bail!("Empty host in endpoint URL");
926 }
927
928 if authority.starts_with('[') {
930 let bracket_end = authority
932 .find(']')
933 .context("Invalid IPv6 address: missing closing bracket")?;
934 let host = &authority[..bracket_end + 1];
935 let rest = &authority[bracket_end + 1..];
936 if let Some(port_str) = rest.strip_prefix(':') {
937 let port: u32 = port_str.parse().context("Invalid port in endpoint URL")?;
938 return Ok((host.to_string(), port));
939 }
940 return Ok((host.to_string(), 80));
941 }
942
943 if let Some((host, port_str)) = authority.rsplit_once(':') {
945 if host.is_empty() {
946 anyhow::bail!("Empty host in endpoint URL");
947 }
948 let port: u32 = port_str.parse().context("Invalid port in endpoint URL")?;
949 Ok((host.to_string(), port))
950 } else {
951 Ok((authority.to_string(), 80))
952 }
953}
954
955#[async_trait]
957pub trait RpcInterface: Send + Sync {
958 fn interface_name(&self) -> &str;
960
961 fn method_names(&self) -> &[&'static str];
963
964 async fn handle(&self, request: RpcRequest) -> RpcResponse;
966}
967
968pub struct InterfaceHandler<T: RpcInterface> {
970 interface: Arc<T>,
971}
972
973impl<T: RpcInterface> InterfaceHandler<T> {
974 pub fn new(interface: T) -> Self {
975 Self {
976 interface: Arc::new(interface),
977 }
978 }
979
980 pub fn into_arc(self) -> Arc<Self> {
981 Arc::new(self)
982 }
983}
984
985#[async_trait]
986impl<T: RpcInterface + 'static> RpcHandler for InterfaceHandler<T> {
987 async fn handle(&self, request: RpcRequest) -> RpcResponse {
988 self.interface.handle(request).await
989 }
990}
991
992#[cfg(test)]
993mod tests {
994 use super::*;
995 use synapse_rpc::FunctionHandler;
996
997 #[test]
1000 fn test_builder_sets_name() {
1001 let svc = ServiceBuilder::new("my-service").build();
1002 assert_eq!(svc.name(), "my-service");
1003 }
1004
1005 #[test]
1006 fn test_builder_random_instance_id() {
1007 let s1 = ServiceBuilder::new("svc").build();
1008 let s2 = ServiceBuilder::new("svc").build();
1009 assert_ne!(s1.instance_id(), s2.instance_id());
1010 }
1011
1012 #[test]
1013 fn test_builder_custom_instance_id() {
1014 let id = InstanceId::new_random();
1015 let svc = ServiceBuilder::new("svc").with_instance_id(id).build();
1016 assert_eq!(svc.instance_id(), id);
1017 }
1018
1019 #[test]
1020 fn test_builder_with_http_endpoint() {
1021 let svc = ServiceBuilder::new("svc")
1022 .with_http_endpoint("http://localhost:9001/rpc")
1023 .build();
1024 assert_eq!(
1025 svc.http_endpoint.as_deref(),
1026 Some("http://localhost:9001/rpc")
1027 );
1028 }
1029
1030 #[test]
1031 fn test_builder_with_gateway() {
1032 let svc = ServiceBuilder::new("svc")
1033 .with_gateway("http://localhost:8080")
1034 .build();
1035 assert!(svc.gateway_client().is_some());
1036 }
1037
1038 #[test]
1039 fn test_builder_no_gateway() {
1040 let svc = ServiceBuilder::new("svc").build();
1041 assert!(svc.gateway_client().is_none());
1042 }
1043
1044 #[test]
1045 fn test_builder_chaining() {
1046 let id = InstanceId::new_random();
1047 let svc = ServiceBuilder::new("svc")
1048 .with_instance_id(id)
1049 .with_http_endpoint("http://localhost:9001/rpc")
1050 .with_gateway("http://localhost:8080")
1051 .build();
1052 assert_eq!(svc.name(), "svc");
1053 assert_eq!(svc.instance_id(), id);
1054 assert!(svc.gateway_client().is_some());
1055 }
1056
1057 #[test]
1060 fn test_service_builder_static() {
1061 let builder = Service::builder("test");
1062 let svc = builder.build();
1063 assert_eq!(svc.name(), "test");
1064 }
1065
1066 #[test]
1067 fn test_is_gateway_connected_default_false() {
1068 let svc = ServiceBuilder::new("svc").build();
1069 assert!(!svc.is_gateway_connected());
1070 }
1071
1072 #[test]
1073 fn test_health_response_defaults_healthy() {
1074 let svc = ServiceBuilder::new("svc").build();
1075 let resp = svc.health_response();
1076 assert_eq!(resp.status, synapse_proto::HealthStatus::Healthy as i32);
1077 }
1078
1079 #[test]
1080 fn test_health_response_has_uptime() {
1081 let svc = ServiceBuilder::new("svc").build();
1082 std::thread::sleep(std::time::Duration::from_millis(10));
1083 let resp = svc.health_response();
1084 assert!(resp.uptime_ms >= 10);
1085 }
1086
1087 #[test]
1088 fn test_health_response_has_instance_id() {
1089 let svc = ServiceBuilder::new("svc").build();
1090 let resp = svc.health_response();
1091 assert_eq!(resp.instance_id.len(), 16);
1092 }
1093
1094 #[test]
1095 fn test_set_health_check_custom() {
1096 let mut svc = ServiceBuilder::new("svc").build();
1097 svc.set_health_check(|| HealthStatus::degraded("high load"));
1098 let resp = svc.health_response();
1099 assert_eq!(resp.status, synapse_proto::HealthStatus::Degraded as i32);
1100 assert_eq!(resp.message, "high load");
1101 }
1102
1103 #[test]
1106 fn test_health_status_healthy() {
1107 let h = HealthStatus::healthy();
1108 assert_eq!(h.status, synapse_proto::HealthStatus::Healthy);
1109 assert!(h.message.is_none());
1110 }
1111
1112 #[test]
1113 fn test_health_status_degraded() {
1114 let h = HealthStatus::degraded("slow");
1115 assert_eq!(h.status, synapse_proto::HealthStatus::Degraded);
1116 assert_eq!(h.message.as_deref(), Some("slow"));
1117 }
1118
1119 #[test]
1120 fn test_health_status_unhealthy() {
1121 let h = HealthStatus::unhealthy("broken");
1122 assert_eq!(h.status, synapse_proto::HealthStatus::Unhealthy);
1123 }
1124
1125 #[test]
1126 fn test_health_status_draining() {
1127 let h = HealthStatus::draining("shutting down");
1128 assert_eq!(h.status, synapse_proto::HealthStatus::Draining);
1129 }
1130
1131 #[tokio::test]
1134 async fn test_register_interface_locally() {
1135 let svc = ServiceBuilder::new("svc").build();
1136 let handler = Arc::new(FunctionHandler::new(move |_req| {
1137 Box::pin(async move {
1138 RpcResponse {
1139 status: synapse_proto::RpcStatus::Ok as i32,
1140 payload: Bytes::from("handled"),
1141 error: None,
1142 headers: vec![],
1143 responded_at_unix_ms: 0,
1144 }
1145 })
1146 }));
1147
1148 svc.register_interface("test.Echo", &["Echo"], handler)
1149 .await
1150 .unwrap();
1151
1152 let req = RpcRequest {
1154 interface_id: synapse_primitives::InterfaceId::from_name("test.Echo").into(),
1155 method_id: synapse_primitives::MethodId::from_name("Echo").into(),
1156 headers: vec![],
1157 payload: Bytes::new(),
1158 sent_at_unix_ms: 0,
1159 };
1160 let resp = svc.handle_request(req).await;
1161 assert_eq!(resp.status, synapse_proto::RpcStatus::Ok as i32);
1162 assert_eq!(resp.payload, Bytes::from("handled"));
1163 }
1164
1165 #[tokio::test]
1166 async fn test_handle_request_unknown_interface() {
1167 let svc = ServiceBuilder::new("svc").build();
1168 let req = RpcRequest {
1169 interface_id: 99999,
1170 method_id: 1,
1171 headers: vec![],
1172 payload: Bytes::new(),
1173 sent_at_unix_ms: 0,
1174 };
1175 let resp = svc.handle_request(req).await;
1176 assert_eq!(
1177 resp.status,
1178 synapse_proto::RpcStatus::InterfaceNotFound as i32
1179 );
1180 }
1181}