Skip to main content

rate_core/actors/node/
actor.rs

1pub mod link;
2
3use super::config::NodeConfig;
4use crate::actors::client_session::SessionAcl;
5use crate::actors::router::Router;
6use crate::actors::supervisor::Supervisor;
7use crate::info;
8use anyhow::Error;
9use async_trait::async_trait;
10use derive_more::From;
11use meio::{
12    Actor, Address, Context, Eliminated, IdOf, InteractionDone, InterruptedBy, StartedBy, Tag,
13    TaskError,
14};
15use meio_connect::server::{link::WaitForAddress, HttpServer, HttpServerLink};
16use rill_engine::{EngineConfig, RillEngine};
17use std::net::SocketAddr;
18use strum::{EnumIter, IntoEnumIterator};
19
20#[derive(Debug, From)]
21pub struct NodeLink<T: Supervisor> {
22    address: Address<Node<T>>,
23}
24
25impl<T: Supervisor> Clone for NodeLink<T> {
26    fn clone(&self) -> Self {
27        Self {
28            address: self.address.clone(),
29        }
30    }
31}
32
33pub struct Node<T: Supervisor> {
34    config: NodeConfig,
35    external_server: Option<HttpServerLink>,
36    internal_server: Option<HttpServerLink>,
37    supervisor: Address<T>,
38    // TODO: RouterLink here?
39    router: Option<Address<Router<T>>>,
40    global_acl: SessionAcl,
41}
42
43impl<T: Supervisor> Node<T> {
44    pub fn new(config: NodeConfig, supervisor: Address<T>, global_acl: SessionAcl) -> Self {
45        Self {
46            config,
47            external_server: None,
48            internal_server: None,
49            supervisor,
50            router: None,
51            global_acl,
52        }
53    }
54
55    pub fn router(&mut self) -> Result<&mut Address<Router<T>>, Error> {
56        self.router
57            .as_mut()
58            .ok_or_else(|| Error::msg("Router lost"))
59    }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
63pub enum Group {
64    App,
65    External,
66    Tracer,
67    Internal,
68    Router,
69    Service,
70}
71
72#[async_trait]
73impl<T: Supervisor> Actor for Node<T> {
74    type GroupBy = Group;
75
76    fn name(&self) -> String {
77        "Node".into()
78    }
79}
80
81#[async_trait]
82impl<T: Supervisor> StartedBy<T> for Node<T> {
83    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
84        ctx.termination_sequence(Group::iter().collect());
85        log::info!("Starting the node...");
86
87        info::TRACERS.touch();
88
89        log::info!("Starting internal server...");
90        let http_server = HttpServer::new(self.config.internal_address());
91        let internal_address = ctx.spawn_actor(http_server, Group::Internal);
92        let internal_link: HttpServerLink = internal_address.link();
93        let wait_addr = internal_link.wait_for_address();
94        ctx.track_interaction(wait_addr, Internal, Group::Service);
95        self.internal_server = Some(internal_link);
96
97        log::info!("Starting external server...");
98        let http_server = HttpServer::new(self.config.external_address());
99        let external_address = ctx.spawn_actor(http_server, Group::External);
100        let external_link: HttpServerLink = external_address.link();
101        let wait_addr = external_link.wait_for_address();
102        ctx.track_interaction(wait_addr, External, Group::Service);
103        self.external_server = Some(external_link);
104
105        // TODO: `Router` is not needed in the future... Look to the `AppBind`
106        log::info!("Starting router...");
107        let router = Router::new(
108            self.supervisor.link(),
109            external_address.link(),
110            self.config.external_address().port(),
111            internal_address.link(),
112            self.global_acl.clone(),
113        );
114        let router_addr = ctx.spawn_actor(router, Group::Router);
115        self.router = Some(router_addr);
116
117        Ok(())
118    }
119}
120
121#[async_trait]
122impl<T: Supervisor> InterruptedBy<T> for Node<T> {
123    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
124        log::info!("Terminating the node...");
125        ctx.shutdown();
126        Ok(())
127    }
128}
129
130#[async_trait]
131impl<T: Supervisor> Eliminated<HttpServer> for Node<T> {
132    async fn handle(
133        &mut self,
134        _id: IdOf<HttpServer>,
135        _ctx: &mut Context<Self>,
136    ) -> Result<(), Error> {
137        log::info!("HttpServer finished");
138        Ok(())
139    }
140}
141
142#[async_trait]
143impl<T: Supervisor> Eliminated<Router<T>> for Node<T> {
144    async fn handle(
145        &mut self,
146        _id: IdOf<Router<T>>,
147        _ctx: &mut Context<Self>,
148    ) -> Result<(), Error> {
149        log::info!("Router finished");
150        Ok(())
151    }
152}
153
154struct External;
155
156impl Tag for External {}
157
158#[async_trait]
159impl<T: Supervisor> InteractionDone<WaitForAddress, External> for Node<T> {
160    async fn handle(
161        &mut self,
162        _tag: External,
163        _addr: SocketAddr,
164        _ctx: &mut Context<Self>,
165    ) -> Result<(), Error> {
166        Ok(())
167    }
168
169    async fn failed(
170        &mut self,
171        _tag: External,
172        err: TaskError,
173        ctx: &mut Context<Self>,
174    ) -> Result<(), Error> {
175        log::error!("Can't wait for an external server: {}", err);
176        ctx.shutdown();
177        Ok(())
178    }
179}
180
181struct Internal;
182
183impl Tag for Internal {}
184
185#[async_trait]
186impl<T: Supervisor> InteractionDone<WaitForAddress, Internal> for Node<T> {
187    async fn handle(
188        &mut self,
189        _tag: Internal,
190        _addr: SocketAddr,
191        ctx: &mut Context<Self>,
192    ) -> Result<(), Error> {
193        let config = EngineConfig {
194            node: None,
195            name: Some(self.global_acl.id().clone()),
196            // TODO: Use `StreamType` from the special package
197            provider_type: "server-info".into(),
198        };
199        let engine = RillEngine::new(config);
200        ctx.spawn_actor(engine, Group::Tracer);
201        Ok(())
202    }
203
204    async fn failed(
205        &mut self,
206        _tag: Internal,
207        err: TaskError,
208        ctx: &mut Context<Self>,
209    ) -> Result<(), Error> {
210        log::error!("Can't wait for an internal server: {}", err);
211        ctx.shutdown();
212        Ok(())
213    }
214}
215
216#[async_trait]
217impl<T: Supervisor> Eliminated<RillEngine> for Node<T> {
218    async fn handle(
219        &mut self,
220        _id: IdOf<RillEngine>,
221        _ctx: &mut Context<Self>,
222    ) -> Result<(), Error> {
223        log::info!("RillEngine finished");
224        Ok(())
225    }
226}