rill_export/actors/export/
actor.rs1use crate::config::ExportConfig;
2use crate::publishers::{self, Publisher};
3use anyhow::Error;
4use async_trait::async_trait;
5use meio::{Actor, Address, Context, Eliminated, IdOf, InterruptedBy, StartedBy};
6use meio_connect::server::HttpServerLink;
7use rill_client::actors::broadcaster::{Broadcaster, BroadcasterLinkForClient};
8use rill_client::actors::client::{ClientLink, RillClient};
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub enum Group {
12 Middleware,
13 Publishers,
14}
15
16pub struct RillExport {
17 config: ExportConfig,
18 client: Option<Address<RillClient>>,
19 broadcaster: Option<Address<Broadcaster>>,
20 server: HttpServerLink,
23}
24
25impl RillExport {
26 pub fn new(config: ExportConfig, server: HttpServerLink) -> Self {
27 Self {
28 config,
29 client: None,
30 broadcaster: None,
31 server,
32 }
33 }
34
35 fn get_broadcaster(&self) -> Result<BroadcasterLinkForClient, Error> {
36 self.broadcaster
37 .clone()
38 .map(BroadcasterLinkForClient::from)
39 .ok_or_else(|| Error::msg("No broadcaster attached to RillExport"))
40 }
41
42 fn get_client(&self) -> Result<ClientLink, Error> {
43 self.client
44 .clone()
45 .map(ClientLink::from)
46 .ok_or_else(|| Error::msg("No broadcaster attached to RillExport"))
47 }
48
49 fn spawn_publisher<T: Publisher>(
50 &mut self,
51 config: T::Config,
52 ctx: &mut Context<Self>,
53 ) -> Result<(), Error> {
54 let broadcaster = self.get_broadcaster()?;
55 let client = self.get_client()?;
56 let publisher = T::create(config, broadcaster, client, &self.server);
57 ctx.spawn_actor(publisher, Group::Publishers);
58 Ok(())
59 }
60}
61
62impl Actor for RillExport {
63 type GroupBy = Group;
64}
65
66#[async_trait]
67impl<T: Actor> StartedBy<T> for RillExport {
68 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
69 ctx.termination_sequence(vec![Group::Publishers, Group::Middleware]);
70 let url = self.config.node_url();
71 let actor = Broadcaster::new();
72 let broadcaster = ctx.spawn_actor(actor, Group::Middleware);
73 let link = broadcaster.link();
74 self.broadcaster = Some(broadcaster);
75
76 let actor = RillClient::new(url, link);
77 let client = ctx.spawn_actor(actor, Group::Middleware);
78 self.client = Some(client);
79
80 if let Some(config) = self.config.prometheus.take() {
81 self.spawn_publisher::<publishers::PrometheusPublisher>(config, ctx)?;
82 }
83 if let Some(config) = self.config.graphite.take() {
84 self.spawn_publisher::<publishers::GraphitePublisher>(config, ctx)?;
85 }
86
87 Ok(())
88 }
89}
90
91#[async_trait]
92impl<T: Actor> InterruptedBy<T> for RillExport {
93 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
94 ctx.shutdown();
95 Ok(())
96 }
97}
98
99#[async_trait]
100impl Eliminated<Broadcaster> for RillExport {
101 async fn handle(
102 &mut self,
103 _id: IdOf<Broadcaster>,
104 _ctx: &mut Context<Self>,
105 ) -> Result<(), Error> {
106 Ok(())
107 }
108}
109
110#[async_trait]
111impl Eliminated<RillClient> for RillExport {
112 async fn handle(
113 &mut self,
114 _id: IdOf<RillClient>,
115 _ctx: &mut Context<Self>,
116 ) -> Result<(), Error> {
117 Ok(())
118 }
119}
120
121#[async_trait]
122impl<T: Publisher> Eliminated<T> for RillExport {
123 async fn handle(&mut self, _id: IdOf<T>, _ctx: &mut Context<Self>) -> Result<(), Error> {
124 Ok(())
125 }
126}