ockam/remote/
lifecycle.rs1use crate::remote::{Addresses, RemoteRelay, RemoteRelayInfo, RemoteRelayOptions};
2use crate::Context;
3use ockam_core::compat::string::{String, ToString};
4use ockam_core::compat::sync::Arc;
5use ockam_core::flow_control::FlowControlId;
6use ockam_core::{
7 AllowAll, AllowSourceAddress, DenyAll, Mailbox, Mailboxes, OutgoingAccessControl, Result, Route,
8};
9use ockam_node::WorkerBuilder;
10use tracing::debug;
11
12#[derive(Clone, Copy)]
13pub(super) enum RelayType {
14 Static,
15 Ephemeral,
16}
17
18impl RelayType {
19 pub fn str(&self) -> &'static str {
20 match self {
21 RelayType::Static => "static",
22 RelayType::Ephemeral => "ephemeral",
23 }
24 }
25}
26
27impl RemoteRelay {
28 fn mailboxes(
29 addresses: Addresses,
30 outgoing_access_control: Arc<dyn OutgoingAccessControl>,
31 ) -> Mailboxes {
32 let main_internal = Mailbox::new(
33 addresses.main_internal,
34 None,
35 Arc::new(DenyAll),
36 outgoing_access_control,
37 );
38
39 let main_remote = Mailbox::new(
40 addresses.main_remote,
41 None,
42 Arc::new(AllowAll),
43 Arc::new(AllowAll),
44 );
45
46 Mailboxes::new(main_internal, vec![main_remote])
47 }
48}
49
50impl RemoteRelay {
51 fn new(
52 addresses: Addresses,
53 registration_route: Route,
54 registration_payload: String,
55 flow_control_id: Option<FlowControlId>,
56 ) -> Self {
57 Self {
58 addresses,
59 completion_msg_sent: false,
60 registration_route,
61 registration_payload,
62 flow_control_id,
63 }
64 }
65
66 pub async fn create_static(
68 ctx: &Context,
69 orchestrator_route: impl Into<Route>,
70 alias: impl Into<String>,
71 options: RemoteRelayOptions,
72 ) -> Result<RemoteRelayInfo> {
73 let addresses = Addresses::generate(RelayType::Static);
74
75 let mut callback_ctx = ctx.new_detached_with_mailboxes(Mailboxes::primary(
76 addresses.completion_callback.clone(),
77 Arc::new(AllowSourceAddress(addresses.main_remote.clone())),
78 Arc::new(DenyAll),
79 ))?;
80
81 let registration_route = orchestrator_route.into() + "static_forwarding_service";
82
83 let flow_control_id =
84 options.setup_flow_control(ctx.flow_controls(), &addresses, registration_route.next()?);
85 let outgoing_access_control =
86 options.create_access_control(ctx.flow_controls(), flow_control_id.clone());
87
88 let relay = Self::new(
89 addresses.clone(),
90 registration_route,
91 alias.into(),
92 flow_control_id,
93 );
94
95 debug!("Starting static RemoteRelay at {}", &addresses.main_remote);
96 let mailboxes = Self::mailboxes(addresses, outgoing_access_control);
97 WorkerBuilder::new(relay)
98 .with_mailboxes(mailboxes)
99 .start(ctx)?;
100
101 let resp = callback_ctx
102 .receive::<RemoteRelayInfo>()
103 .await?
104 .into_body()?;
105
106 Ok(resp)
107 }
108
109 pub async fn create(
111 ctx: &Context,
112 orchestrator_route: impl Into<Route>,
113 options: RemoteRelayOptions,
114 ) -> Result<RemoteRelayInfo> {
115 let addresses = Addresses::generate(RelayType::Ephemeral);
116
117 let mut callback_ctx = ctx.new_detached_with_mailboxes(Mailboxes::primary(
118 addresses.completion_callback.clone(),
119 Arc::new(AllowSourceAddress(addresses.main_remote.clone())),
120 Arc::new(DenyAll),
121 ))?;
122
123 let registration_route = orchestrator_route.into() + "forwarding_service";
124
125 let flow_control_id =
126 options.setup_flow_control(ctx.flow_controls(), &addresses, registration_route.next()?);
127 let outgoing_access_control =
128 options.create_access_control(ctx.flow_controls(), flow_control_id.clone());
129
130 let relay = Self::new(
131 addresses.clone(),
132 registration_route,
133 "register".to_string(),
134 flow_control_id,
135 );
136
137 debug!(
138 "Starting ephemeral RemoteRelay at {}",
139 &addresses.main_internal
140 );
141 let mailboxes = Self::mailboxes(addresses, outgoing_access_control);
142 WorkerBuilder::new(relay)
143 .with_mailboxes(mailboxes)
144 .start(ctx)?;
145
146 let resp = callback_ctx
147 .receive::<RemoteRelayInfo>()
148 .await?
149 .into_body()?;
150
151 Ok(resp)
152 }
153}