1use super::{Observer, Publisher, SharedRecord};
2use crate::actors::export::RillExport;
3use crate::config::GraphiteConfig;
4use anyhow::Error;
5use async_trait::async_trait;
6use meio::{
7 task::{HeartBeat, Tick},
8 ActionHandler, Actor, Context, IdOf, InterruptedBy, LiteTask, StartedBy, TaskEliminated,
9 TaskError,
10};
11use meio_connect::server::HttpServerLink;
12use rill_client::actors::broadcaster::{BroadcasterLinkForClient, PathNotification};
13use rill_client::actors::client::ClientLink;
14use rill_protocol::io::provider::{Path, PathPattern};
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::io::Write;
18use std::time::Duration;
19use tokio::io::AsyncWriteExt;
20use tokio::net::TcpStream;
21use tokio::sync::broadcast;
22
23pub struct GraphitePublisher {
24 config: GraphiteConfig,
25 broadcaster: BroadcasterLinkForClient,
26 client: ClientLink,
27 pickled: bool,
28 metrics: HashMap<Path, SharedRecord>,
29 sender: broadcast::Sender<Vec<u8>>,
30}
31
32impl Publisher for GraphitePublisher {
33 type Config = GraphiteConfig;
34
35 fn create(
36 config: Self::Config,
37 broadcaster: BroadcasterLinkForClient,
38 client: ClientLink,
39 _server: &HttpServerLink,
40 ) -> Self {
41 let (sender, _rx) = broadcast::channel(32);
42 Self {
43 config,
44 broadcaster,
45 client,
46 pickled: true, metrics: HashMap::new(),
48 sender,
49 }
50 }
51}
52
53impl GraphitePublisher {
54 async fn graceful_shutdown(&mut self, ctx: &mut Context<Self>) {
55 ctx.shutdown();
58 }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Hash)]
62pub enum Group {
63 HeartBeat,
64 Streams,
65 Connection,
66}
67
68impl Actor for GraphitePublisher {
69 type GroupBy = Group;
70}
71
72#[async_trait]
73impl StartedBy<RillExport> for GraphitePublisher {
74 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
75 ctx.termination_sequence(vec![Group::HeartBeat, Group::Streams, Group::Connection]);
76 let interval = self.config.interval.unwrap_or(1_000);
77 let duration = Duration::from_millis(interval);
78 let heartbeat = HeartBeat::new(duration, ctx.address().clone());
79 ctx.spawn_task(heartbeat, Group::HeartBeat);
80 let connection = Connection::new(self.sender.clone());
81 ctx.spawn_task(connection, Group::Connection);
82 self.broadcaster
83 .subscribe_to_struct_changes(ctx.address().clone())
84 .await?;
85 Ok(())
86 }
87}
88
89#[async_trait]
90impl InterruptedBy<RillExport> for GraphitePublisher {
91 async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
92 self.graceful_shutdown(ctx).await;
93 Ok(())
94 }
95}
96
97#[async_trait]
98impl TaskEliminated<HeartBeat> for GraphitePublisher {
99 async fn handle(
100 &mut self,
101 _id: IdOf<HeartBeat>,
102 _result: Result<(), TaskError>,
103 ctx: &mut Context<Self>,
104 ) -> Result<(), Error> {
105 self.graceful_shutdown(ctx).await;
106 Ok(())
107 }
108}
109
110#[async_trait]
111impl TaskEliminated<Connection> for GraphitePublisher {
112 async fn handle(
113 &mut self,
114 _id: IdOf<Connection>,
115 _result: Result<(), TaskError>,
116 ctx: &mut Context<Self>,
117 ) -> Result<(), Error> {
118 self.graceful_shutdown(ctx).await;
119 Ok(())
120 }
121}
122
123#[async_trait]
124impl ActionHandler<Tick> for GraphitePublisher {
125 async fn handle(&mut self, _: Tick, _ctx: &mut Context<Self>) -> Result<(), Error> {
126 if self.sender.receiver_count() > 0 {
127 if self.pickled {
128 let mut pool = Vec::with_capacity(self.metrics.len());
130 for (path, record) in self.metrics.drain() {
131 if let Some(record) = record.get().await {
132 let ts = record.timestamp;
133 let value = record.value;
134 let line = (path.to_string(), (ts.as_secs(), value));
135 log::trace!("Graphite export: {} - {}", path, value);
136 pool.push(line);
137 }
138 }
139 let mut buffer = Vec::new();
141 Write::write(&mut buffer, &0_u32.to_be_bytes())?;
142 serde_pickle::to_writer(&mut buffer, &pool, false)?;
143 let prefix_len = std::mem::size_of::<u32>();
144 let len: u32 = (buffer.len() - prefix_len).try_into()?;
145 buffer[0..prefix_len].copy_from_slice(&len.to_be_bytes());
146 self.sender.send(buffer).map_err(|_| {
147 Error::msg("Can't send data to Graphite (no active connections)")
148 })?;
149 } else {
150 }
152 }
153 Ok(())
154 }
155}
156
157#[async_trait]
158impl ActionHandler<PathNotification> for GraphitePublisher {
159 async fn handle(
160 &mut self,
161 msg: PathNotification,
162 ctx: &mut Context<Self>,
163 ) -> Result<(), Error> {
164 match msg {
165 PathNotification::Paths { descriptions } => {
166 for description in descriptions {
167 let path = &description.path;
168 let pattern = PathPattern { path: path.clone() };
170 if self.config.paths.contains(&pattern) {
171 let record = SharedRecord::new();
172 self.metrics.insert(path.clone(), record.clone());
173 let observer = Observer::new(description, self.client.clone(), record);
174 ctx.spawn_task(observer, Group::Streams);
175 }
176 }
177 }
178 PathNotification::Name { .. } => {}
179 }
180 Ok(())
181 }
182}
183
184#[async_trait]
185impl TaskEliminated<Observer> for GraphitePublisher {
186 async fn handle(
187 &mut self,
188 _id: IdOf<Observer>,
189 _result: Result<(), TaskError>,
190 _ctx: &mut Context<Self>,
191 ) -> Result<(), Error> {
192 Ok(())
193 }
194}
195
196struct Connection {
197 sender: broadcast::Sender<Vec<u8>>,
198}
199
200impl Connection {
201 fn new(sender: broadcast::Sender<Vec<u8>>) -> Self {
202 Self { sender }
203 }
204}
205
206#[async_trait]
207impl LiteTask for Connection {
208 type Output = ();
209
210 async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
211 let mut rx = self.sender.subscribe();
212 loop {
214 let mut socket = TcpStream::connect("127.0.0.1:2004").await?;
216 let data = rx.recv().await?;
217 socket.write_all(&data).await?;
218 }
219 }
220}