1pub mod link;
2
3use super::config::NodeConfig;
4use crate::actors::client_session::SessionAcl;
5use crate::actors::router::Router;
6use crate::actors::supervisor::Supervisor;
7use crate::info;
8use anyhow::Error;
9use async_trait::async_trait;
10use derive_more::From;
11use meio::{
12 Actor, Address, Context, Eliminated, IdOf, InteractionDone, InterruptedBy, StartedBy, Tag,
13 TaskError,
14};
15use meio_connect::server::{link::WaitForAddress, HttpServer, HttpServerLink};
16use rill_engine::{EngineConfig, RillEngine};
17use std::net::SocketAddr;
18use strum::{EnumIter, IntoEnumIterator};
19
20#[derive(Debug, From)]
21pub struct NodeLink<T: Supervisor> {
22 address: Address<Node<T>>,
23}
24
25impl<T: Supervisor> Clone for NodeLink<T> {
26 fn clone(&self) -> Self {
27 Self {
28 address: self.address.clone(),
29 }
30 }
31}
32
33pub struct Node<T: Supervisor> {
34 config: NodeConfig,
35 external_server: Option<HttpServerLink>,
36 internal_server: Option<HttpServerLink>,
37 supervisor: Address<T>,
38 router: Option<Address<Router<T>>>,
40 global_acl: SessionAcl,
41}
42
43impl<T: Supervisor> Node<T> {
44 pub fn new(config: NodeConfig, supervisor: Address<T>, global_acl: SessionAcl) -> Self {
45 Self {
46 config,
47 external_server: None,
48 internal_server: None,
49 supervisor,
50 router: None,
51 global_acl,
52 }
53 }
54
55 pub fn router(&mut self) -> Result<&mut Address<Router<T>>, Error> {
56 self.router
57 .as_mut()
58 .ok_or_else(|| Error::msg("Router lost"))
59 }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
63pub enum Group {
64 App,
65 External,
66 Tracer,
67 Internal,
68 Router,
69 Service,
70}
71
72#[async_trait]
73impl<T: Supervisor> Actor for Node<T> {
74 type GroupBy = Group;
75
76 fn name(&self) -> String {
77 "Node".into()
78 }
79}
80
81#[async_trait]
82impl<T: Supervisor> StartedBy<T> for Node<T> {
83 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
84 ctx.termination_sequence(Group::iter().collect());
85 log::info!("Starting the node...");
86
87 info::TRACERS.touch();
88
89 log::info!("Starting internal server...");
90 let http_server = HttpServer::new(self.config.internal_address());
91 let internal_address = ctx.spawn_actor(http_server, Group::Internal);
92 let internal_link: HttpServerLink = internal_address.link();
93 let wait_addr = internal_link.wait_for_address();
94 ctx.track_interaction(wait_addr, Internal, Group::Service);
95 self.internal_server = Some(internal_link);
96
97 log::info!("Starting external server...");
98 let http_server = HttpServer::new(self.config.external_address());
99 let external_address = ctx.spawn_actor(http_server, Group::External);
100 let external_link: HttpServerLink = external_address.link();
101 let wait_addr = external_link.wait_for_address();
102 ctx.track_interaction(wait_addr, External, Group::Service);
103 self.external_server = Some(external_link);
104
105 log::info!("Starting router...");
107 let router = Router::new(
108 self.supervisor.link(),
109 external_address.link(),
110 self.config.external_address().port(),
111 internal_address.link(),
112 self.global_acl.clone(),
113 );
114 let router_addr = ctx.spawn_actor(router, Group::Router);
115 self.router = Some(router_addr);
116
117 Ok(())
118 }
119}
120
121#[async_trait]
122impl<T: Supervisor> InterruptedBy<T> for Node<T> {
123 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
124 log::info!("Terminating the node...");
125 ctx.shutdown();
126 Ok(())
127 }
128}
129
130#[async_trait]
131impl<T: Supervisor> Eliminated<HttpServer> for Node<T> {
132 async fn handle(
133 &mut self,
134 _id: IdOf<HttpServer>,
135 _ctx: &mut Context<Self>,
136 ) -> Result<(), Error> {
137 log::info!("HttpServer finished");
138 Ok(())
139 }
140}
141
142#[async_trait]
143impl<T: Supervisor> Eliminated<Router<T>> for Node<T> {
144 async fn handle(
145 &mut self,
146 _id: IdOf<Router<T>>,
147 _ctx: &mut Context<Self>,
148 ) -> Result<(), Error> {
149 log::info!("Router finished");
150 Ok(())
151 }
152}
153
154struct External;
155
156impl Tag for External {}
157
158#[async_trait]
159impl<T: Supervisor> InteractionDone<WaitForAddress, External> for Node<T> {
160 async fn handle(
161 &mut self,
162 _tag: External,
163 _addr: SocketAddr,
164 _ctx: &mut Context<Self>,
165 ) -> Result<(), Error> {
166 Ok(())
167 }
168
169 async fn failed(
170 &mut self,
171 _tag: External,
172 err: TaskError,
173 ctx: &mut Context<Self>,
174 ) -> Result<(), Error> {
175 log::error!("Can't wait for an external server: {}", err);
176 ctx.shutdown();
177 Ok(())
178 }
179}
180
181struct Internal;
182
183impl Tag for Internal {}
184
185#[async_trait]
186impl<T: Supervisor> InteractionDone<WaitForAddress, Internal> for Node<T> {
187 async fn handle(
188 &mut self,
189 _tag: Internal,
190 _addr: SocketAddr,
191 ctx: &mut Context<Self>,
192 ) -> Result<(), Error> {
193 let config = EngineConfig {
194 node: None,
195 name: Some(self.global_acl.id().clone()),
196 provider_type: "server-info".into(),
198 };
199 let engine = RillEngine::new(config);
200 ctx.spawn_actor(engine, Group::Tracer);
201 Ok(())
202 }
203
204 async fn failed(
205 &mut self,
206 _tag: Internal,
207 err: TaskError,
208 ctx: &mut Context<Self>,
209 ) -> Result<(), Error> {
210 log::error!("Can't wait for an internal server: {}", err);
211 ctx.shutdown();
212 Ok(())
213 }
214}
215
216#[async_trait]
217impl<T: Supervisor> Eliminated<RillEngine> for Node<T> {
218 async fn handle(
219 &mut self,
220 _id: IdOf<RillEngine>,
221 _ctx: &mut Context<Self>,
222 ) -> Result<(), Error> {
223 log::info!("RillEngine finished");
224 Ok(())
225 }
226}