Skip to main content

middleware_core/stack/
transport.rs

1use core_types::{ErrorCode, ErrorDomain, RtError, SessionId, TransportDomain};
2use transport_core::{Endpoint, TransportAdapter, TransportSession};
3
4use crate::route::{RouteHint, RouteResolver, RouteRule, RouteTrafficKind, resolve_with_rules};
5use crate::session::{ReconnectBackoffPolicy, SessionManager, SessionRecord};
6
7use super::MiddlewareStack;
8
9impl MiddlewareStack {
10    pub fn open_session(
11        &mut self,
12        domain: TransportDomain,
13        target: impl Into<String>,
14    ) -> SessionId {
15        self.session_manager.open_session(domain, target)
16    }
17
18    pub fn resolve_route(&self, endpoints: &[Endpoint], hint: &RouteHint) -> Option<Endpoint> {
19        resolve_with_rules(endpoints, &self.route_rules, hint)
20            .or_else(|| self.route_resolver.resolve(endpoints, hint))
21    }
22
23    pub fn add_route_rule(&mut self, rule: RouteRule) {
24        self.route_rules.push(rule);
25    }
26
27    pub fn resolve_domain_for(
28        &self,
29        target: &str,
30        traffic_kind: RouteTrafficKind,
31    ) -> TransportDomain {
32        let endpoints = self
33            .endpoint_entries()
34            .into_iter()
35            .map(|entry| entry.endpoint)
36            .collect::<Vec<_>>();
37        if endpoints.is_empty() {
38            return TransportDomain::Local;
39        }
40
41        for preferred in [TransportDomain::Local, TransportDomain::Network] {
42            let hint = RouteHint {
43                preferred_domain: preferred,
44                labels: Vec::new(),
45                target_name: Some(target.to_string()),
46                traffic_kind: Some(traffic_kind),
47            };
48            if let Some(endpoint) = self.resolve_route(&endpoints, &hint) {
49                return endpoint_domain(&endpoint);
50            }
51        }
52
53        endpoint_domain(&endpoints[0])
54    }
55
56    pub fn set_namespace_isolation(&mut self, enabled: bool) {
57        self.namespace_isolation = enabled;
58    }
59
60    pub fn namespace_isolation(&self) -> bool {
61        self.namespace_isolation
62    }
63
64    pub fn connect_via<A: TransportAdapter>(
65        &mut self,
66        adapter: &mut A,
67        endpoints: &[Endpoint],
68        hint: &RouteHint,
69        target: impl Into<String>,
70    ) -> Result<Box<dyn TransportSession>, RtError> {
71        let endpoint = self.resolve_route(endpoints, hint).ok_or_else(|| {
72            RtError::new(
73                ErrorCode::NotFound,
74                ErrorDomain::Core,
75                false,
76                "no route resolved",
77            )
78        })?;
79
80        let session_id = self.open_session(adapter.domain(), target);
81        match adapter.connect(endpoint) {
82            Ok(session) => {
83                self.session_manager.mark_connected(session_id);
84                Ok(session)
85            }
86            Err(err) => {
87                self.session_manager
88                    .mark_reconnecting(session_id, format!("connect error: {}", err));
89                Err(err)
90            }
91        }
92    }
93
94    pub fn close_session(&mut self, session_id: SessionId) {
95        self.session_manager.mark_closed(session_id);
96    }
97
98    pub fn mark_session_connected(&mut self, session_id: SessionId) {
99        self.session_manager.mark_connected(session_id);
100    }
101
102    pub fn session_record(&self, session_id: SessionId) -> Option<&SessionRecord> {
103        self.session_manager.get_session(session_id)
104    }
105
106    pub fn set_reconnect_backoff_policy(&mut self, policy: ReconnectBackoffPolicy) {
107        self.session_manager.set_backoff_policy(policy);
108    }
109}
110
111fn endpoint_domain(endpoint: &Endpoint) -> TransportDomain {
112    match endpoint.scheme {
113        transport_core::EndpointScheme::Udp
114        | transport_core::EndpointScheme::Tcp
115        | transport_core::EndpointScheme::Quic => TransportDomain::Network,
116        transport_core::EndpointScheme::Ipc
117        | transport_core::EndpointScheme::SharedMemory
118        | transport_core::EndpointScheme::Custom(_) => TransportDomain::Local,
119    }
120}