middleware_core/stack/
transport.rs1use core_types::{ErrorCode, ErrorDomain, RtError, SessionId, TransportDomain};
2use transport_core::{Endpoint, TransportAdapter, TransportSession};
3
4use crate::route::{RouteHint, RouteResolver, RouteRule, RouteTrafficKind, resolve_with_rules};
5use crate::session::{ReconnectBackoffPolicy, SessionManager, SessionRecord};
6
7use super::MiddlewareStack;
8
9impl MiddlewareStack {
10 pub fn open_session(
11 &mut self,
12 domain: TransportDomain,
13 target: impl Into<String>,
14 ) -> SessionId {
15 self.session_manager.open_session(domain, target)
16 }
17
18 pub fn resolve_route(&self, endpoints: &[Endpoint], hint: &RouteHint) -> Option<Endpoint> {
19 resolve_with_rules(endpoints, &self.route_rules, hint)
20 .or_else(|| self.route_resolver.resolve(endpoints, hint))
21 }
22
23 pub fn add_route_rule(&mut self, rule: RouteRule) {
24 self.route_rules.push(rule);
25 }
26
27 pub fn resolve_domain_for(
28 &self,
29 target: &str,
30 traffic_kind: RouteTrafficKind,
31 ) -> TransportDomain {
32 let endpoints = self
33 .endpoint_entries()
34 .into_iter()
35 .map(|entry| entry.endpoint)
36 .collect::<Vec<_>>();
37 if endpoints.is_empty() {
38 return TransportDomain::Local;
39 }
40
41 for preferred in [TransportDomain::Local, TransportDomain::Network] {
42 let hint = RouteHint {
43 preferred_domain: preferred,
44 labels: Vec::new(),
45 target_name: Some(target.to_string()),
46 traffic_kind: Some(traffic_kind),
47 };
48 if let Some(endpoint) = self.resolve_route(&endpoints, &hint) {
49 return endpoint_domain(&endpoint);
50 }
51 }
52
53 endpoint_domain(&endpoints[0])
54 }
55
56 pub fn set_namespace_isolation(&mut self, enabled: bool) {
57 self.namespace_isolation = enabled;
58 }
59
60 pub fn namespace_isolation(&self) -> bool {
61 self.namespace_isolation
62 }
63
64 pub fn connect_via<A: TransportAdapter>(
65 &mut self,
66 adapter: &mut A,
67 endpoints: &[Endpoint],
68 hint: &RouteHint,
69 target: impl Into<String>,
70 ) -> Result<Box<dyn TransportSession>, RtError> {
71 let endpoint = self.resolve_route(endpoints, hint).ok_or_else(|| {
72 RtError::new(
73 ErrorCode::NotFound,
74 ErrorDomain::Core,
75 false,
76 "no route resolved",
77 )
78 })?;
79
80 let session_id = self.open_session(adapter.domain(), target);
81 match adapter.connect(endpoint) {
82 Ok(session) => {
83 self.session_manager.mark_connected(session_id);
84 Ok(session)
85 }
86 Err(err) => {
87 self.session_manager
88 .mark_reconnecting(session_id, format!("connect error: {}", err));
89 Err(err)
90 }
91 }
92 }
93
94 pub fn close_session(&mut self, session_id: SessionId) {
95 self.session_manager.mark_closed(session_id);
96 }
97
98 pub fn mark_session_connected(&mut self, session_id: SessionId) {
99 self.session_manager.mark_connected(session_id);
100 }
101
102 pub fn session_record(&self, session_id: SessionId) -> Option<&SessionRecord> {
103 self.session_manager.get_session(session_id)
104 }
105
106 pub fn set_reconnect_backoff_policy(&mut self, policy: ReconnectBackoffPolicy) {
107 self.session_manager.set_backoff_policy(policy);
108 }
109}
110
111fn endpoint_domain(endpoint: &Endpoint) -> TransportDomain {
112 match endpoint.scheme {
113 transport_core::EndpointScheme::Udp
114 | transport_core::EndpointScheme::Tcp
115 | transport_core::EndpointScheme::Quic => TransportDomain::Network,
116 transport_core::EndpointScheme::Ipc
117 | transport_core::EndpointScheme::SharedMemory
118 | transport_core::EndpointScheme::Custom(_) => TransportDomain::Local,
119 }
120}