use core_types::{ErrorCode, ErrorDomain, RtError, SessionId, TransportDomain};
use transport_core::{Endpoint, TransportAdapter, TransportSession};
use crate::route::{RouteHint, RouteResolver, RouteRule, RouteTrafficKind, resolve_with_rules};
use crate::session::{ReconnectBackoffPolicy, SessionManager, SessionRecord};
use super::MiddlewareStack;
impl MiddlewareStack {
pub fn open_session(
&mut self,
domain: TransportDomain,
target: impl Into<String>,
) -> SessionId {
self.session_manager.open_session(domain, target)
}
pub fn resolve_route(&self, endpoints: &[Endpoint], hint: &RouteHint) -> Option<Endpoint> {
resolve_with_rules(endpoints, &self.route_rules, hint)
.or_else(|| self.route_resolver.resolve(endpoints, hint))
}
pub fn add_route_rule(&mut self, rule: RouteRule) {
self.route_rules.push(rule);
}
pub fn resolve_domain_for(
&self,
target: &str,
traffic_kind: RouteTrafficKind,
) -> TransportDomain {
let endpoints = self
.endpoint_entries()
.into_iter()
.map(|entry| entry.endpoint)
.collect::<Vec<_>>();
if endpoints.is_empty() {
return TransportDomain::Local;
}
for preferred in [TransportDomain::Local, TransportDomain::Network] {
let hint = RouteHint {
preferred_domain: preferred,
labels: Vec::new(),
target_name: Some(target.to_string()),
traffic_kind: Some(traffic_kind),
};
if let Some(endpoint) = self.resolve_route(&endpoints, &hint) {
return endpoint_domain(&endpoint);
}
}
endpoint_domain(&endpoints[0])
}
pub fn set_namespace_isolation(&mut self, enabled: bool) {
self.namespace_isolation = enabled;
}
pub fn namespace_isolation(&self) -> bool {
self.namespace_isolation
}
pub fn connect_via<A: TransportAdapter>(
&mut self,
adapter: &mut A,
endpoints: &[Endpoint],
hint: &RouteHint,
target: impl Into<String>,
) -> Result<Box<dyn TransportSession>, RtError> {
let endpoint = self.resolve_route(endpoints, hint).ok_or_else(|| {
RtError::new(
ErrorCode::NotFound,
ErrorDomain::Core,
false,
"no route resolved",
)
})?;
let session_id = self.open_session(adapter.domain(), target);
match adapter.connect(endpoint) {
Ok(session) => {
self.session_manager.mark_connected(session_id);
Ok(session)
}
Err(err) => {
self.session_manager
.mark_reconnecting(session_id, format!("connect error: {}", err));
Err(err)
}
}
}
pub fn close_session(&mut self, session_id: SessionId) {
self.session_manager.mark_closed(session_id);
}
pub fn mark_session_connected(&mut self, session_id: SessionId) {
self.session_manager.mark_connected(session_id);
}
pub fn session_record(&self, session_id: SessionId) -> Option<&SessionRecord> {
self.session_manager.get_session(session_id)
}
pub fn set_reconnect_backoff_policy(&mut self, policy: ReconnectBackoffPolicy) {
self.session_manager.set_backoff_policy(policy);
}
}
fn endpoint_domain(endpoint: &Endpoint) -> TransportDomain {
match endpoint.scheme {
transport_core::EndpointScheme::Udp
| transport_core::EndpointScheme::Tcp
| transport_core::EndpointScheme::Quic => TransportDomain::Network,
transport_core::EndpointScheme::Ipc
| transport_core::EndpointScheme::SharedMemory
| transport_core::EndpointScheme::Custom(_) => TransportDomain::Local,
}
}