rill_export/actors/export/
actor.rs

1use crate::config::ExportConfig;
2use crate::publishers::{self, Publisher};
3use anyhow::Error;
4use async_trait::async_trait;
5use meio::{Actor, Address, Context, Eliminated, IdOf, InterruptedBy, StartedBy};
6use meio_connect::server::HttpServerLink;
7use rill_client::actors::broadcaster::{Broadcaster, BroadcasterLinkForClient};
8use rill_client::actors::client::{ClientLink, RillClient};
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub enum Group {
12    Middleware,
13    Publishers,
14}
15
16pub struct RillExport {
17    config: ExportConfig,
18    client: Option<Address<RillClient>>,
19    broadcaster: Option<Address<Broadcaster>>,
20    /// It used to bind publishers that require to have an HTTP endpoint.
21    /// Like `Prometheus` publisher.
22    server: HttpServerLink,
23}
24
25impl RillExport {
26    pub fn new(config: ExportConfig, server: HttpServerLink) -> Self {
27        Self {
28            config,
29            client: None,
30            broadcaster: None,
31            server,
32        }
33    }
34
35    fn get_broadcaster(&self) -> Result<BroadcasterLinkForClient, Error> {
36        self.broadcaster
37            .clone()
38            .map(BroadcasterLinkForClient::from)
39            .ok_or_else(|| Error::msg("No broadcaster attached to RillExport"))
40    }
41
42    fn get_client(&self) -> Result<ClientLink, Error> {
43        self.client
44            .clone()
45            .map(ClientLink::from)
46            .ok_or_else(|| Error::msg("No broadcaster attached to RillExport"))
47    }
48
49    fn spawn_publisher<T: Publisher>(
50        &mut self,
51        config: T::Config,
52        ctx: &mut Context<Self>,
53    ) -> Result<(), Error> {
54        let broadcaster = self.get_broadcaster()?;
55        let client = self.get_client()?;
56        let publisher = T::create(config, broadcaster, client, &self.server);
57        ctx.spawn_actor(publisher, Group::Publishers);
58        Ok(())
59    }
60}
61
62impl Actor for RillExport {
63    type GroupBy = Group;
64}
65
66#[async_trait]
67impl<T: Actor> StartedBy<T> for RillExport {
68    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
69        ctx.termination_sequence(vec![Group::Publishers, Group::Middleware]);
70        let url = self.config.node_url();
71        let actor = Broadcaster::new();
72        let broadcaster = ctx.spawn_actor(actor, Group::Middleware);
73        let link = broadcaster.link();
74        self.broadcaster = Some(broadcaster);
75
76        let actor = RillClient::new(url, link);
77        let client = ctx.spawn_actor(actor, Group::Middleware);
78        self.client = Some(client);
79
80        if let Some(config) = self.config.prometheus.take() {
81            self.spawn_publisher::<publishers::PrometheusPublisher>(config, ctx)?;
82        }
83        if let Some(config) = self.config.graphite.take() {
84            self.spawn_publisher::<publishers::GraphitePublisher>(config, ctx)?;
85        }
86
87        Ok(())
88    }
89}
90
91#[async_trait]
92impl<T: Actor> InterruptedBy<T> for RillExport {
93    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
94        ctx.shutdown();
95        Ok(())
96    }
97}
98
99#[async_trait]
100impl Eliminated<Broadcaster> for RillExport {
101    async fn handle(
102        &mut self,
103        _id: IdOf<Broadcaster>,
104        _ctx: &mut Context<Self>,
105    ) -> Result<(), Error> {
106        Ok(())
107    }
108}
109
110#[async_trait]
111impl Eliminated<RillClient> for RillExport {
112    async fn handle(
113        &mut self,
114        _id: IdOf<RillClient>,
115        _ctx: &mut Context<Self>,
116    ) -> Result<(), Error> {
117        Ok(())
118    }
119}
120
121#[async_trait]
122impl<T: Publisher> Eliminated<T> for RillExport {
123    async fn handle(&mut self, _id: IdOf<T>, _ctx: &mut Context<Self>) -> Result<(), Error> {
124        Ok(())
125    }
126}