Skip to main content

rate_core/actors/router/
actor.rs

1mod external;
2mod internal;
3pub mod limits;
4
5use crate::actors::client_session::{ClientSession, SessionAcl};
6use crate::actors::node::Node;
7use crate::actors::provider_session::ProviderSession;
8use crate::actors::supervisor::{Supervisor, SupervisorLink};
9use crate::connection_limiter::ConnectionLimiter;
10use crate::registry::Registry;
11use anyhow::Error;
12use async_trait::async_trait;
13use meio::{Actor, Context, InterruptedBy, StartedBy};
14use meio_connect::server::HttpServerLink;
15use strum::{EnumIter, IntoEnumIterator};
16
17pub struct Router<T: Supervisor> {
18    external_server: HttpServerLink,
19    external_port: u16,
20    internal_server: HttpServerLink,
21    registry: Registry,
22    global_acl: SessionAcl,
23
24    supervisor: SupervisorLink<T>,
25
26    active_providers: ConnectionLimiter<ProviderSession>,
27    active_clients: ConnectionLimiter<ClientSession<T>>,
28}
29
30impl<T: Supervisor> Router<T> {
31    pub fn new(
32        supervisor: SupervisorLink<T>,
33        external_server: HttpServerLink,
34        external_port: u16,
35        internal_server: HttpServerLink,
36        global_acl: SessionAcl,
37    ) -> Self {
38        Self {
39            external_server,
40            external_port,
41            internal_server,
42            registry: Registry::new(),
43            global_acl,
44            supervisor,
45            // TODO: Add GlobalLimitController
46            active_providers: ConnectionLimiter::new(),
47            active_clients: ConnectionLimiter::new(),
48        }
49    }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
53pub enum Group {
54    Externals,
55    Internals,
56    Fetchers,
57}
58
59impl<T: Supervisor> Actor for Router<T> {
60    type GroupBy = Group;
61
62    fn name(&self) -> String {
63        "Router".into()
64    }
65}
66
67#[async_trait]
68impl<T: Supervisor> StartedBy<Node<T>> for Router<T> {
69    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
70        ctx.termination_sequence(Group::iter().collect());
71        self.init_internal(ctx).await?;
72        self.init_external(ctx).await?;
73        Ok(())
74    }
75}
76
77#[async_trait]
78impl<T: Supervisor> InterruptedBy<Node<T>> for Router<T> {
79    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
80        ctx.shutdown();
81        Ok(())
82    }
83}