Skip to main content

synapse_sdk/
service.rs

1//! Service development helpers
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use bytes::Bytes;
6use std::{
7    path::PathBuf,
8    sync::{
9        Arc,
10        atomic::{AtomicBool, Ordering},
11    },
12};
13use synapse_primitives::{InstanceId, Uuid};
14use synapse_proto::{
15    HttpEndpointRegister, HttpMethod, HttpRoute, InstanceCapabilities, InterfaceRegister,
16    MessageKind, RpcRequest, RpcResponse, SynapseMessage, synapse_message,
17};
18use synapse_rpc::{
19    HttpRpcClient, InterfaceRegistration, PROTOCOL_VERSION, RpcHandler, RpcServer,
20    extract_http_registration_ack, extract_registration_ack,
21};
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24
25/// mTLS configuration for connecting to gateway
26#[derive(Clone)]
27pub struct MtlsClientConfig {
28    pub cert_path: PathBuf,
29    pub key_path: PathBuf,
30    pub ca_cert_path: PathBuf,
31}
32
33/// Service builder for creating Synapse services
34pub struct ServiceBuilder {
35    service_name: String,
36    instance_id: InstanceId,
37    http_endpoint: Option<String>,
38    gateway_url: Option<String>,
39    mtls_config: Option<MtlsClientConfig>,
40}
41
42impl ServiceBuilder {
43    /// Create a new service builder
44    pub fn new(service_name: impl Into<String>) -> Self {
45        Self {
46            service_name: service_name.into(),
47            instance_id: InstanceId::new_random(),
48            http_endpoint: None,
49            gateway_url: None,
50            mtls_config: None,
51        }
52    }
53
54    /// Set a specific instance ID (useful for testing)
55    pub fn with_instance_id(mut self, instance_id: InstanceId) -> Self {
56        self.instance_id = instance_id;
57        self
58    }
59
60    /// Set the HTTP endpoint where this instance receives RPC traffic
61    pub fn with_http_endpoint(mut self, http_endpoint: impl Into<String>) -> Self {
62        self.http_endpoint = Some(http_endpoint.into());
63        self
64    }
65
66    /// Set the gateway URL for registration (plain HTTP, dev mode only)
67    pub fn with_gateway(mut self, gateway_url: impl Into<String>) -> Self {
68        self.gateway_url = Some(gateway_url.into());
69        self
70    }
71
72    /// Set the gateway URL with mTLS (production)
73    pub fn with_gateway_mtls(
74        mut self,
75        gateway_url: impl Into<String>,
76        cert_path: impl Into<PathBuf>,
77        key_path: impl Into<PathBuf>,
78        ca_cert_path: impl Into<PathBuf>,
79    ) -> Self {
80        self.gateway_url = Some(gateway_url.into());
81        self.mtls_config = Some(MtlsClientConfig {
82            cert_path: cert_path.into(),
83            key_path: key_path.into(),
84            ca_cert_path: ca_cert_path.into(),
85        });
86        self
87    }
88
89    /// Build the service with an RPC server (plain HTTP gateway client)
90    pub fn build(self) -> Service {
91        let gateway_client = match (&self.gateway_url, &self.mtls_config) {
92            (Some(url), Some(mtls)) => {
93                match HttpRpcClient::json_mtls(
94                    url,
95                    &mtls.cert_path,
96                    &mtls.key_path,
97                    &mtls.ca_cert_path,
98                ) {
99                    Ok(client) => Some(client),
100                    Err(e) => {
101                        error!("Failed to create mTLS gateway client: {}", e);
102                        None
103                    }
104                }
105            }
106            (Some(url), None) => Some(HttpRpcClient::json(url)),
107            _ => None,
108        };
109
110        Service {
111            service_name: self.service_name,
112            instance_id: self.instance_id,
113            http_endpoint: self.http_endpoint,
114            rpc_server: Arc::new(RpcServer::new()),
115            gateway_client,
116            health_check: None,
117            start_time: std::time::Instant::now(),
118            registered_interfaces: RwLock::new(Vec::new()),
119            registered_http_routes: RwLock::new(Vec::new()),
120            registered_http_route_groups: RwLock::new(Vec::new()),
121            gateway_connected: AtomicBool::new(false),
122        }
123    }
124}
125
126/// Health check callback that services implement to report their status
127pub type HealthCheckFn = Arc<dyn Fn() -> HealthStatus + Send + Sync>;
128
129/// Health status that a service can report
130#[derive(Debug, Clone)]
131pub struct HealthStatus {
132    pub status: synapse_proto::HealthStatus,
133    pub message: Option<String>,
134}
135
136impl HealthStatus {
137    pub fn healthy() -> Self {
138        Self {
139            status: synapse_proto::HealthStatus::Healthy,
140            message: None,
141        }
142    }
143
144    pub fn degraded(message: impl Into<String>) -> Self {
145        Self {
146            status: synapse_proto::HealthStatus::Degraded,
147            message: Some(message.into()),
148        }
149    }
150
151    pub fn unhealthy(message: impl Into<String>) -> Self {
152        Self {
153            status: synapse_proto::HealthStatus::Unhealthy,
154            message: Some(message.into()),
155        }
156    }
157
158    pub fn draining(message: impl Into<String>) -> Self {
159        Self {
160            status: synapse_proto::HealthStatus::Draining,
161            message: Some(message.into()),
162        }
163    }
164}
165
166/// Response type for error pages on this route
167#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
168pub enum RouteResponseType {
169    /// JSON error responses (for APIs)
170    #[default]
171    Json,
172    /// HTML error pages (for user-facing web routes)
173    Html,
174}
175
176/// Configuration for an HTTP route to register with the gateway
177#[derive(Debug, Clone)]
178pub struct HttpRouteConfig {
179    /// Path prefix for routing (e.g., "/api/users")
180    pub path: String,
181    /// HTTP methods (empty = all methods)
182    pub methods: Vec<HttpMethod>,
183    /// Subdomain pattern: "" (default, bare domain), "any", "exact:domain", "prefix:x-", "suffix:.x"
184    pub subdomain: String,
185    /// Middleware names to apply (resolved from gateway config definitions)
186    pub middleware: Vec<String>,
187    /// If true, this route is a metrics endpoint (registered for Prometheus discovery, not public routing)
188    pub metrics: bool,
189    /// Error response format for this route (default: JSON)
190    pub response_type: RouteResponseType,
191}
192
193impl HttpRouteConfig {
194    /// Create a new route config for the given path (all methods, bare domain)
195    pub fn new(path: impl Into<String>) -> Self {
196        Self {
197            path: path.into(),
198            methods: vec![],
199            subdomain: String::new(),
200            middleware: vec![],
201            metrics: false,
202            response_type: RouteResponseType::Json,
203        }
204    }
205
206    /// Restrict to specific HTTP methods
207    pub fn with_methods(mut self, methods: Vec<HttpMethod>) -> Self {
208        self.methods = methods;
209        self
210    }
211
212    /// Set subdomain pattern
213    pub fn with_subdomain(mut self, subdomain: impl Into<String>) -> Self {
214        self.subdomain = subdomain.into();
215        self
216    }
217
218    /// Set middleware names to apply to this route
219    pub fn with_middleware(mut self, middleware: Vec<String>) -> Self {
220        self.middleware = middleware;
221        self
222    }
223
224    /// Mark this route as a metrics endpoint for Prometheus discovery
225    pub fn as_metrics(mut self) -> Self {
226        self.metrics = true;
227        self
228    }
229
230    /// Mark this route as serving HTML (error pages will be HTML instead of JSON)
231    pub fn as_html(mut self) -> Self {
232        self.response_type = RouteResponseType::Html;
233        self
234    }
235}
236
237/// A batch of HTTP routes targeting a specific host:port.
238/// Used to register routes that proxy to a different backend than the service itself
239/// (e.g., a frontend dev server, static file server, or external API).
240///
241/// Each group needs its own `service_name` because the gateway maps
242/// service_name → endpoint URL (one URL per service). Routes targeting
243/// different backends must use different service names.
244#[derive(Debug, Clone)]
245pub struct HttpRouteGroup {
246    /// Service name for this group (used by the gateway for endpoint lookup)
247    pub service_name: String,
248    /// Target host (e.g., "127.0.0.1")
249    pub host: String,
250    /// Target port (e.g., 3222)
251    pub port: u32,
252    /// Routes to register for this target
253    pub routes: Vec<HttpRouteConfig>,
254}
255
256impl HttpRouteGroup {
257    pub fn new(
258        service_name: impl Into<String>,
259        host: impl Into<String>,
260        port: u32,
261        routes: Vec<HttpRouteConfig>,
262    ) -> Self {
263        Self {
264            service_name: service_name.into(),
265            host: host.into(),
266            port,
267            routes,
268        }
269    }
270}
271
272/// Stored registration info for reconnection
273#[derive(Clone)]
274struct RegisteredInterface {
275    registration: InterfaceRegistration,
276    #[allow(dead_code)]
277    handler: Arc<dyn RpcHandler>,
278}
279
280/// A Synapse service instance
281pub struct Service {
282    service_name: String,
283    instance_id: InstanceId,
284    http_endpoint: Option<String>,
285    rpc_server: Arc<RpcServer>,
286    gateway_client: Option<HttpRpcClient>,
287    health_check: Option<HealthCheckFn>,
288    start_time: std::time::Instant,
289    /// Track registered interfaces for re-registration on reconnect
290    registered_interfaces: RwLock<Vec<RegisteredInterface>>,
291    /// Track registered HTTP routes for re-registration on reconnect
292    registered_http_routes: RwLock<Vec<HttpRouteConfig>>,
293    /// Track registered HTTP route groups (arbitrary host:port) for re-registration
294    registered_http_route_groups: RwLock<Vec<HttpRouteGroup>>,
295    /// Whether we're currently connected to gateway
296    gateway_connected: AtomicBool,
297}
298
299impl Service {
300    /// Create a new service builder
301    pub fn builder(service_name: impl Into<String>) -> ServiceBuilder {
302        ServiceBuilder::new(service_name)
303    }
304
305    /// Get the service name
306    pub fn name(&self) -> &str {
307        &self.service_name
308    }
309
310    /// Get the instance ID
311    pub fn instance_id(&self) -> InstanceId {
312        self.instance_id
313    }
314
315    /// Get access to the RPC server
316    pub fn rpc_server(&self) -> &Arc<RpcServer> {
317        &self.rpc_server
318    }
319
320    /// Register a codegen-generated interface
321    ///
322    /// Takes the `(InterfaceRegistration, Arc<dyn RpcHandler>)` tuple returned by
323    /// `FooServiceRouter::create(impl)` and registers it locally + with the gateway.
324    ///
325    /// This is the preferred way to register interfaces — no need to repeat
326    /// interface names or method names manually.
327    ///
328    /// # Example
329    /// ```ignore
330    /// let impl_ = MyServiceImpl::new();
331    /// let (reg, handler) = MyServiceRouter::create(impl_);
332    /// service.register(reg, handler).await?;
333    /// ```
334    pub async fn register(
335        &self,
336        mut registration: InterfaceRegistration,
337        handler: Arc<dyn RpcHandler>,
338    ) -> Result<()> {
339        // Override instance_id and service_name with this service's values
340        registration.instance_id = self.instance_id;
341        registration.service_name = self.service_name.clone();
342
343        self.register_inner(registration, handler).await
344    }
345
346    /// Register an interface implementation from string parameters
347    ///
348    /// Prefer [`register`] when using codegen-generated routers.
349    /// This method is useful for dynamic/manual interface registration.
350    pub async fn register_interface(
351        &self,
352        interface_name: &str,
353        method_names: &[&str],
354        handler: Arc<dyn RpcHandler>,
355    ) -> Result<()> {
356        let registration = InterfaceRegistration::new(
357            interface_name,
358            &self.service_name,
359            method_names,
360            self.instance_id,
361        );
362
363        self.register_inner(registration, handler).await
364    }
365
366    /// Internal: register an interface with a fully-built registration
367    async fn register_inner(
368        &self,
369        registration: InterfaceRegistration,
370        handler: Arc<dyn RpcHandler>,
371    ) -> Result<()> {
372        info!(
373            service = %self.service_name,
374            interface = %registration.interface_name,
375            interface_id = ?registration.interface_id,
376            "Registering interface locally"
377        );
378
379        // Register locally
380        self.rpc_server
381            .registry()
382            .register(registration.clone(), handler.clone())
383            .await
384            .map_err(|e| anyhow::anyhow!("Failed to register interface locally: {}", e))?;
385
386        // Store for reconnection
387        {
388            let mut interfaces = self.registered_interfaces.write().await;
389            interfaces.push(RegisteredInterface {
390                registration: registration.clone(),
391                handler,
392            });
393        }
394
395        // Register with gateway if configured
396        if let Some(ref client) = self.gateway_client {
397            match self.register_with_gateway(client, &registration).await {
398                Ok(()) => {
399                    self.gateway_connected.store(true, Ordering::SeqCst);
400                }
401                Err(e) => {
402                    self.gateway_connected.store(false, Ordering::SeqCst);
403                    // Don't fail - we'll retry via the reconnection task
404                    warn!(
405                        service = %self.service_name,
406                        error = %e,
407                        "Failed to register with gateway (will retry)"
408                    );
409                }
410            }
411        }
412
413        Ok(())
414    }
415
416    /// Register HTTP routes with the gateway, proxying to this service's own endpoint.
417    ///
418    /// Sends an `HttpEndpointRegister` message to the gateway, which will
419    /// add the routes to the gateway's HTTP router. The gateway must have
420    /// appropriate `http_registration_permissions` configured to allow this.
421    ///
422    /// Routes are stored and automatically re-registered if the gateway
423    /// connection is lost and re-established.
424    pub async fn register_http_routes(&self, routes: Vec<HttpRouteConfig>) -> Result<()> {
425        let Some(ref client) = self.gateway_client else {
426            return Err(anyhow::anyhow!(
427                "Cannot register HTTP routes: no gateway client configured"
428            ));
429        };
430
431        // Store routes for reconnection
432        {
433            let mut stored = self.registered_http_routes.write().await;
434            stored.extend(routes.iter().cloned());
435        }
436
437        let endpoint = self.http_endpoint.clone().unwrap_or_default();
438        let (host, port) = parse_host_port(&endpoint)?;
439        self.send_http_registration(client, &routes, &host, port, &self.service_name, false)
440            .await
441    }
442
443    /// Register HTTP routes with the gateway, proxying to an arbitrary host:port.
444    ///
445    /// Use this to register routes for services that aren't Synapse-connected
446    /// (e.g., a frontend dev server, static file server, or external API).
447    /// The gateway will proxy matching requests to the specified target.
448    ///
449    /// Each group has its own `service_name` because the gateway maps
450    /// service_name → endpoint URL. Different backends need different names.
451    ///
452    /// Routes are stored and automatically re-registered on reconnect.
453    pub async fn register_http_route_group(&self, group: HttpRouteGroup) -> Result<()> {
454        let Some(ref client) = self.gateway_client else {
455            return Err(anyhow::anyhow!(
456                "Cannot register HTTP routes: no gateway client configured"
457            ));
458        };
459
460        // Store for reconnection
461        {
462            let mut stored = self.registered_http_route_groups.write().await;
463            stored.push(group.clone());
464        }
465
466        self.send_http_registration(
467            client,
468            &group.routes,
469            &group.host,
470            group.port,
471            &group.service_name,
472            true,
473        )
474        .await
475    }
476
477    /// Internal: send HTTP route registration to gateway with explicit host:port and service name
478    async fn send_http_registration(
479        &self,
480        client: &HttpRpcClient,
481        routes: &[HttpRouteConfig],
482        host: &str,
483        port: u32,
484        service_name: &str,
485        delegated: bool,
486    ) -> Result<()> {
487        let proto_routes: Vec<HttpRoute> = routes
488            .iter()
489            .map(|r| HttpRoute {
490                path_prefix: r.path.clone(),
491                methods: r.methods.iter().map(|m| *m as i32).collect(),
492                options: None,
493                subdomain: r.subdomain.clone(),
494                middleware: r.middleware.clone(),
495                metrics: r.metrics,
496                response_type: match r.response_type {
497                    RouteResponseType::Json => synapse_proto::ResponseType::Json as i32,
498                    RouteResponseType::Html => synapse_proto::ResponseType::Html as i32,
499                },
500            })
501            .collect();
502
503        let register = HttpEndpointRegister {
504            instance_id: Bytes::copy_from_slice(&self.instance_id.as_bytes()),
505            routes: proto_routes,
506            host: host.to_string(),
507            port,
508            health_check_path: String::new(),
509            health_check_interval_ms: 0,
510            service_name: service_name.to_string(),
511            delegated,
512        };
513
514        let request_id = Uuid::new_v4();
515        let message = SynapseMessage {
516            protocol_version: PROTOCOL_VERSION.as_u32(),
517            kind: MessageKind::HttpEndpointRegister as i32,
518            request_id: Bytes::copy_from_slice(request_id.as_bytes()),
519            message: Some(synapse_message::Message::HttpEndpointRegister(register)),
520        };
521
522        let response = client
523            .send(&message)
524            .await
525            .context("Failed to send HTTP registration to gateway")?;
526
527        if let Some((_, ack)) = extract_http_registration_ack(response) {
528            if ack.success {
529                info!(
530                    service = %service_name,
531                    host = %host,
532                    port = %port,
533                    routes = routes.len(),
534                    "HTTP routes registered with gateway"
535                );
536                Ok(())
537            } else {
538                error!(
539                    service = %service_name,
540                    error = %ack.error_message,
541                    "Gateway rejected HTTP route registration"
542                );
543                Err(anyhow::anyhow!(
544                    "Gateway rejected HTTP route registration: {}",
545                    ack.error_message
546                ))
547            }
548        } else {
549            Err(anyhow::anyhow!(
550                "Unexpected response from gateway during HTTP registration"
551            ))
552        }
553    }
554
555    /// Register an interface with the gateway
556    async fn register_with_gateway(
557        &self,
558        client: &HttpRpcClient,
559        registration: &InterfaceRegistration,
560    ) -> Result<()> {
561        let http_endpoint = self.http_endpoint.clone().unwrap_or_default();
562
563        info!(
564            service = %self.service_name,
565            interface = %registration.interface_name,
566            endpoint = %http_endpoint,
567            "Registering interface with gateway"
568        );
569
570        // Build the InterfaceRegister proto message
571        let register = InterfaceRegister {
572            interface_id: registration.interface_id.into(),
573            interface_version: registration.interface_version,
574            method_ids: registration
575                .method_ids
576                .iter()
577                .map(|m| (*m).into())
578                .collect(),
579            instance_id: Bytes::copy_from_slice(&self.instance_id.as_bytes()),
580            capabilities: Some(InstanceCapabilities::default()),
581            service_name: self.service_name.clone(),
582            interface_name: registration.interface_name.clone(),
583            http_endpoint,
584            method_names: registration.method_names.clone(),
585        };
586
587        // Create the SynapseMessage
588        let request_id = Uuid::new_v4();
589        let message = SynapseMessage {
590            protocol_version: PROTOCOL_VERSION.as_u32(),
591            kind: MessageKind::InterfaceRegister as i32,
592            request_id: Bytes::copy_from_slice(request_id.as_bytes()),
593            message: Some(synapse_message::Message::InterfaceRegister(register)),
594        };
595
596        // Send to gateway
597        let response = client
598            .send(&message)
599            .await
600            .context("Failed to send registration to gateway")?;
601
602        // Check the response
603        if let Some((_, ack)) = extract_registration_ack(response) {
604            if ack.success {
605                info!(
606                    service = %self.service_name,
607                    interface = %registration.interface_name,
608                    "Successfully registered with gateway"
609                );
610                Ok(())
611            } else {
612                error!(
613                    service = %self.service_name,
614                    interface = %registration.interface_name,
615                    error = %ack.error_message,
616                    "Gateway rejected registration"
617                );
618                Err(anyhow::anyhow!(
619                    "Gateway rejected registration: {}",
620                    ack.error_message
621                ))
622            }
623        } else {
624            warn!(
625                service = %self.service_name,
626                "Unexpected response from gateway during registration"
627            );
628            Err(anyhow::anyhow!("Unexpected response from gateway"))
629        }
630    }
631
632    /// Handle an RPC request
633    pub async fn handle_request(&self, request: RpcRequest) -> RpcResponse {
634        self.rpc_server.handle_request(request).await
635    }
636
637    /// Set a custom health check function
638    pub fn set_health_check<F>(&mut self, check: F)
639    where
640        F: Fn() -> HealthStatus + Send + Sync + 'static,
641    {
642        self.health_check = Some(Arc::new(check));
643    }
644
645    /// Get current health status
646    fn get_health_status(&self) -> HealthStatus {
647        if let Some(ref health_check) = self.health_check {
648            health_check()
649        } else {
650            HealthStatus::healthy()
651        }
652    }
653
654    /// Create a HealthResponse message
655    pub fn health_response(&self) -> synapse_proto::HealthResponse {
656        let health = self.get_health_status();
657        let uptime_ms = self.start_time.elapsed().as_millis() as i64;
658
659        synapse_proto::HealthResponse {
660            instance_id: Bytes::copy_from_slice(&self.instance_id.as_bytes()),
661            status: health.status as i32,
662            version: env!("CARGO_PKG_VERSION").to_string(),
663            uptime_ms,
664            message: health.message.unwrap_or_default(),
665        }
666    }
667
668    /// Check if currently connected to gateway
669    pub fn is_gateway_connected(&self) -> bool {
670        self.gateway_connected.load(Ordering::SeqCst)
671    }
672
673    /// Re-register all interfaces and HTTP routes with the gateway
674    async fn reregister_all(&self) -> Result<()> {
675        let Some(ref client) = self.gateway_client else {
676            return Ok(());
677        };
678
679        // Re-register RPC interfaces
680        let interfaces = self.registered_interfaces.read().await;
681        if !interfaces.is_empty() {
682            info!(
683                service = %self.service_name,
684                count = interfaces.len(),
685                "Re-registering interfaces with gateway"
686            );
687
688            for iface in interfaces.iter() {
689                match self
690                    .register_with_gateway(client, &iface.registration)
691                    .await
692                {
693                    Ok(()) => {
694                        info!(
695                            service = %self.service_name,
696                            interface = %iface.registration.interface_name,
697                            "Re-registered interface"
698                        );
699                    }
700                    Err(e) => {
701                        error!(
702                            service = %self.service_name,
703                            interface = %iface.registration.interface_name,
704                            error = %e,
705                            "Failed to re-register interface"
706                        );
707                        return Err(e);
708                    }
709                }
710            }
711        }
712        drop(interfaces);
713
714        // Re-register HTTP routes (own endpoint)
715        let http_routes = self.registered_http_routes.read().await;
716        if !http_routes.is_empty() {
717            info!(
718                service = %self.service_name,
719                count = http_routes.len(),
720                "Re-registering HTTP routes with gateway"
721            );
722            let routes: Vec<HttpRouteConfig> = http_routes.clone();
723            drop(http_routes);
724
725            let endpoint = self.http_endpoint.clone().unwrap_or_default();
726            let (host, port) = parse_host_port(&endpoint)?;
727            if let Err(e) = self
728                .send_http_registration(client, &routes, &host, port, &self.service_name, false)
729                .await
730            {
731                error!(
732                    service = %self.service_name,
733                    error = %e,
734                    "Failed to re-register HTTP routes"
735                );
736                return Err(e);
737            }
738        } else {
739            drop(http_routes);
740        }
741
742        // Re-register HTTP route groups (delegated services)
743        let route_groups = self.registered_http_route_groups.read().await;
744        if !route_groups.is_empty() {
745            let groups: Vec<HttpRouteGroup> = route_groups.clone();
746            drop(route_groups);
747
748            for group in &groups {
749                info!(
750                    service = %self.service_name,
751                    target = %format!("{}:{}", group.host, group.port),
752                    count = group.routes.len(),
753                    "Re-registering delegated HTTP service with gateway"
754                );
755                if let Err(e) = self
756                    .send_http_registration(
757                        client,
758                        &group.routes,
759                        &group.host,
760                        group.port,
761                        &group.service_name,
762                        true,
763                    )
764                    .await
765                {
766                    error!(
767                        service = %self.service_name,
768                        error = %e,
769                        "Failed to re-register HTTP route group"
770                    );
771                    return Err(e);
772                }
773            }
774        }
775
776        Ok(())
777    }
778
779    /// Start background task that pushes health status and handles reconnection
780    ///
781    /// This task:
782    /// - Sends health status to gateway periodically
783    /// - Detects when gateway connection is lost
784    /// - Automatically re-registers all interfaces and HTTP routes when gateway comes back
785    ///
786    /// `interval_ms` controls the health push frequency.
787    /// `initial_delay_ms` controls the initial startup delay (default 500ms if 0).
788    pub fn start_gateway_connection_task(
789        self: Arc<Self>,
790        interval_ms: u64,
791    ) -> tokio::task::JoinHandle<()> {
792        self.start_gateway_connection_task_with_delay(interval_ms, 500)
793    }
794
795    /// Like `start_gateway_connection_task` but with a custom initial delay
796    pub fn start_gateway_connection_task_with_delay(
797        self: Arc<Self>,
798        interval_ms: u64,
799        initial_delay_ms: u64,
800    ) -> tokio::task::JoinHandle<()> {
801        tokio::spawn(async move {
802            if initial_delay_ms > 0 {
803                tokio::time::sleep(std::time::Duration::from_millis(initial_delay_ms)).await;
804            }
805
806            let mut interval = tokio::time::interval(std::time::Duration::from_millis(interval_ms));
807            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
808            let mut consecutive_failures = 0u32;
809
810            loop {
811                interval.tick().await;
812
813                if let Some(ref client) = self.gateway_client {
814                    let health = self.health_response();
815                    let request_id = Uuid::new_v4();
816
817                    // Create health response message to push to gateway
818                    let message = SynapseMessage {
819                        protocol_version: PROTOCOL_VERSION.as_u32(),
820                        kind: MessageKind::HealthResponse as i32,
821                        request_id: Bytes::copy_from_slice(request_id.as_bytes()),
822                        message: Some(synapse_message::Message::HealthResponse(health.clone())),
823                    };
824
825                    match client.send(&message).await {
826                        Ok(response) => {
827                            // Check if gateway says we need to re-register
828                            let needs_reregister =
829                                if let Some((_, ack)) = extract_registration_ack(response) {
830                                    !ack.success
831                                        && ack.error_message.contains("INSTANCE_NOT_REGISTERED")
832                                } else {
833                                    false
834                                };
835
836                            if needs_reregister {
837                                // Gateway doesn't know us - re-register
838                                info!(
839                                    service = %self.service_name,
840                                    "Gateway requested re-registration, re-registering interfaces"
841                                );
842
843                                if let Err(e) = self.reregister_all().await {
844                                    error!(
845                                        service = %self.service_name,
846                                        error = %e,
847                                        "Failed to re-register interfaces"
848                                    );
849                                    self.gateway_connected.store(false, Ordering::SeqCst);
850                                } else {
851                                    info!(
852                                        service = %self.service_name,
853                                        "Successfully re-registered with gateway"
854                                    );
855                                    self.gateway_connected.store(true, Ordering::SeqCst);
856                                }
857                            } else {
858                                // Normal health ack
859                                self.gateway_connected.store(true, Ordering::SeqCst);
860                                consecutive_failures = 0;
861                                debug!(
862                                    service = %self.service_name,
863                                    status = health.status,
864                                    uptime_ms = health.uptime_ms,
865                                    "Health status pushed to gateway"
866                                );
867                            }
868                        }
869                        Err(e) => {
870                            let was_connected =
871                                self.gateway_connected.swap(false, Ordering::SeqCst);
872                            consecutive_failures += 1;
873
874                            if was_connected {
875                                warn!(
876                                    service = %self.service_name,
877                                    error = %e,
878                                    "Lost connection to gateway, will attempt to reconnect"
879                                );
880                            } else if consecutive_failures % 10 == 1 {
881                                // Log every 10th failure to avoid spam
882                                debug!(
883                                    service = %self.service_name,
884                                    failures = consecutive_failures,
885                                    "Gateway still unavailable, retrying..."
886                                );
887                            }
888                        }
889                    }
890                } else {
891                    // No gateway configured, just log locally
892                    let health = self.get_health_status();
893                    debug!(
894                        service = %self.service_name,
895                        status = ?health.status,
896                        "Health check (no gateway configured)"
897                    );
898                }
899            }
900        })
901    }
902
903    /// Start background health push task (legacy alias for start_gateway_connection_task)
904    pub fn start_health_pusher(self: Arc<Self>, interval_ms: u64) -> tokio::task::JoinHandle<()> {
905        self.start_gateway_connection_task(interval_ms)
906    }
907
908    /// Get access to the gateway client (if configured)
909    pub fn gateway_client(&self) -> Option<&HttpRpcClient> {
910        self.gateway_client.as_ref()
911    }
912}
913
914/// Parse host and port from a URL string like "http://127.0.0.1:9001/rpc"
915fn parse_host_port(endpoint: &str) -> Result<(String, u32)> {
916    let stripped = endpoint
917        .strip_prefix("http://")
918        .or_else(|| endpoint.strip_prefix("https://"))
919        .context("Endpoint must start with http:// or https://")?;
920
921    // Take everything before the first '/'
922    let authority = stripped.split('/').next().unwrap_or(stripped);
923
924    if authority.is_empty() {
925        anyhow::bail!("Empty host in endpoint URL");
926    }
927
928    // Handle IPv6 addresses like [::1]:9001
929    if authority.starts_with('[') {
930        // Find closing bracket
931        let bracket_end = authority
932            .find(']')
933            .context("Invalid IPv6 address: missing closing bracket")?;
934        let host = &authority[..bracket_end + 1];
935        let rest = &authority[bracket_end + 1..];
936        if let Some(port_str) = rest.strip_prefix(':') {
937            let port: u32 = port_str.parse().context("Invalid port in endpoint URL")?;
938            return Ok((host.to_string(), port));
939        }
940        return Ok((host.to_string(), 80));
941    }
942
943    // Standard host:port
944    if let Some((host, port_str)) = authority.rsplit_once(':') {
945        if host.is_empty() {
946            anyhow::bail!("Empty host in endpoint URL");
947        }
948        let port: u32 = port_str.parse().context("Invalid port in endpoint URL")?;
949        Ok((host.to_string(), port))
950    } else {
951        Ok((authority.to_string(), 80))
952    }
953}
954
955/// Trait for implementing RPC interfaces
956#[async_trait]
957pub trait RpcInterface: Send + Sync {
958    /// Get the interface name (e.g., "mensa.user.v1.UserInterface")
959    fn interface_name(&self) -> &str;
960
961    /// Get the list of method names
962    fn method_names(&self) -> &[&'static str];
963
964    /// Handle an RPC request
965    async fn handle(&self, request: RpcRequest) -> RpcResponse;
966}
967
968/// Helper to wrap an RpcInterface as an RpcHandler
969pub struct InterfaceHandler<T: RpcInterface> {
970    interface: Arc<T>,
971}
972
973impl<T: RpcInterface> InterfaceHandler<T> {
974    pub fn new(interface: T) -> Self {
975        Self {
976            interface: Arc::new(interface),
977        }
978    }
979
980    pub fn into_arc(self) -> Arc<Self> {
981        Arc::new(self)
982    }
983}
984
985#[async_trait]
986impl<T: RpcInterface + 'static> RpcHandler for InterfaceHandler<T> {
987    async fn handle(&self, request: RpcRequest) -> RpcResponse {
988        self.interface.handle(request).await
989    }
990}
991
992#[cfg(test)]
993mod tests {
994    use super::*;
995    use synapse_rpc::FunctionHandler;
996
997    // ========== ServiceBuilder ==========
998
999    #[test]
1000    fn test_builder_sets_name() {
1001        let svc = ServiceBuilder::new("my-service").build();
1002        assert_eq!(svc.name(), "my-service");
1003    }
1004
1005    #[test]
1006    fn test_builder_random_instance_id() {
1007        let s1 = ServiceBuilder::new("svc").build();
1008        let s2 = ServiceBuilder::new("svc").build();
1009        assert_ne!(s1.instance_id(), s2.instance_id());
1010    }
1011
1012    #[test]
1013    fn test_builder_custom_instance_id() {
1014        let id = InstanceId::new_random();
1015        let svc = ServiceBuilder::new("svc").with_instance_id(id).build();
1016        assert_eq!(svc.instance_id(), id);
1017    }
1018
1019    #[test]
1020    fn test_builder_with_http_endpoint() {
1021        let svc = ServiceBuilder::new("svc")
1022            .with_http_endpoint("http://localhost:9001/rpc")
1023            .build();
1024        assert_eq!(
1025            svc.http_endpoint.as_deref(),
1026            Some("http://localhost:9001/rpc")
1027        );
1028    }
1029
1030    #[test]
1031    fn test_builder_with_gateway() {
1032        let svc = ServiceBuilder::new("svc")
1033            .with_gateway("http://localhost:8080")
1034            .build();
1035        assert!(svc.gateway_client().is_some());
1036    }
1037
1038    #[test]
1039    fn test_builder_no_gateway() {
1040        let svc = ServiceBuilder::new("svc").build();
1041        assert!(svc.gateway_client().is_none());
1042    }
1043
1044    #[test]
1045    fn test_builder_chaining() {
1046        let id = InstanceId::new_random();
1047        let svc = ServiceBuilder::new("svc")
1048            .with_instance_id(id)
1049            .with_http_endpoint("http://localhost:9001/rpc")
1050            .with_gateway("http://localhost:8080")
1051            .build();
1052        assert_eq!(svc.name(), "svc");
1053        assert_eq!(svc.instance_id(), id);
1054        assert!(svc.gateway_client().is_some());
1055    }
1056
1057    // ========== Service ==========
1058
1059    #[test]
1060    fn test_service_builder_static() {
1061        let builder = Service::builder("test");
1062        let svc = builder.build();
1063        assert_eq!(svc.name(), "test");
1064    }
1065
1066    #[test]
1067    fn test_is_gateway_connected_default_false() {
1068        let svc = ServiceBuilder::new("svc").build();
1069        assert!(!svc.is_gateway_connected());
1070    }
1071
1072    #[test]
1073    fn test_health_response_defaults_healthy() {
1074        let svc = ServiceBuilder::new("svc").build();
1075        let resp = svc.health_response();
1076        assert_eq!(resp.status, synapse_proto::HealthStatus::Healthy as i32);
1077    }
1078
1079    #[test]
1080    fn test_health_response_has_uptime() {
1081        let svc = ServiceBuilder::new("svc").build();
1082        std::thread::sleep(std::time::Duration::from_millis(10));
1083        let resp = svc.health_response();
1084        assert!(resp.uptime_ms >= 10);
1085    }
1086
1087    #[test]
1088    fn test_health_response_has_instance_id() {
1089        let svc = ServiceBuilder::new("svc").build();
1090        let resp = svc.health_response();
1091        assert_eq!(resp.instance_id.len(), 16);
1092    }
1093
1094    #[test]
1095    fn test_set_health_check_custom() {
1096        let mut svc = ServiceBuilder::new("svc").build();
1097        svc.set_health_check(|| HealthStatus::degraded("high load"));
1098        let resp = svc.health_response();
1099        assert_eq!(resp.status, synapse_proto::HealthStatus::Degraded as i32);
1100        assert_eq!(resp.message, "high load");
1101    }
1102
1103    // ========== HealthStatus ==========
1104
1105    #[test]
1106    fn test_health_status_healthy() {
1107        let h = HealthStatus::healthy();
1108        assert_eq!(h.status, synapse_proto::HealthStatus::Healthy);
1109        assert!(h.message.is_none());
1110    }
1111
1112    #[test]
1113    fn test_health_status_degraded() {
1114        let h = HealthStatus::degraded("slow");
1115        assert_eq!(h.status, synapse_proto::HealthStatus::Degraded);
1116        assert_eq!(h.message.as_deref(), Some("slow"));
1117    }
1118
1119    #[test]
1120    fn test_health_status_unhealthy() {
1121        let h = HealthStatus::unhealthy("broken");
1122        assert_eq!(h.status, synapse_proto::HealthStatus::Unhealthy);
1123    }
1124
1125    #[test]
1126    fn test_health_status_draining() {
1127        let h = HealthStatus::draining("shutting down");
1128        assert_eq!(h.status, synapse_proto::HealthStatus::Draining);
1129    }
1130
1131    // ========== register_interface + handle_request ==========
1132
1133    #[tokio::test]
1134    async fn test_register_interface_locally() {
1135        let svc = ServiceBuilder::new("svc").build();
1136        let handler = Arc::new(FunctionHandler::new(move |_req| {
1137            Box::pin(async move {
1138                RpcResponse {
1139                    status: synapse_proto::RpcStatus::Ok as i32,
1140                    payload: Bytes::from("handled"),
1141                    error: None,
1142                    headers: vec![],
1143                    responded_at_unix_ms: 0,
1144                }
1145            })
1146        }));
1147
1148        svc.register_interface("test.Echo", &["Echo"], handler)
1149            .await
1150            .unwrap();
1151
1152        // Verify we can route to it
1153        let req = RpcRequest {
1154            interface_id: synapse_primitives::InterfaceId::from_name("test.Echo").into(),
1155            method_id: synapse_primitives::MethodId::from_name("Echo").into(),
1156            headers: vec![],
1157            payload: Bytes::new(),
1158            sent_at_unix_ms: 0,
1159        };
1160        let resp = svc.handle_request(req).await;
1161        assert_eq!(resp.status, synapse_proto::RpcStatus::Ok as i32);
1162        assert_eq!(resp.payload, Bytes::from("handled"));
1163    }
1164
1165    #[tokio::test]
1166    async fn test_handle_request_unknown_interface() {
1167        let svc = ServiceBuilder::new("svc").build();
1168        let req = RpcRequest {
1169            interface_id: 99999,
1170            method_id: 1,
1171            headers: vec![],
1172            payload: Bytes::new(),
1173            sent_at_unix_ms: 0,
1174        };
1175        let resp = svc.handle_request(req).await;
1176        assert_eq!(
1177            resp.status,
1178            synapse_proto::RpcStatus::InterfaceNotFound as i32
1179        );
1180    }
1181}