rill_export/publishers/
graphite.rs

1use super::{Observer, Publisher, SharedRecord};
2use crate::actors::export::RillExport;
3use crate::config::GraphiteConfig;
4use anyhow::Error;
5use async_trait::async_trait;
6use meio::{
7    task::{HeartBeat, Tick},
8    ActionHandler, Actor, Context, IdOf, InterruptedBy, LiteTask, StartedBy, TaskEliminated,
9    TaskError,
10};
11use meio_connect::server::HttpServerLink;
12use rill_client::actors::broadcaster::{BroadcasterLinkForClient, PathNotification};
13use rill_client::actors::client::ClientLink;
14use rill_protocol::io::provider::{Path, PathPattern};
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::io::Write;
18use std::time::Duration;
19use tokio::io::AsyncWriteExt;
20use tokio::net::TcpStream;
21use tokio::sync::broadcast;
22
23pub struct GraphitePublisher {
24    config: GraphiteConfig,
25    broadcaster: BroadcasterLinkForClient,
26    client: ClientLink,
27    pickled: bool,
28    metrics: HashMap<Path, SharedRecord>,
29    sender: broadcast::Sender<Vec<u8>>,
30}
31
32impl Publisher for GraphitePublisher {
33    type Config = GraphiteConfig;
34
35    fn create(
36        config: Self::Config,
37        broadcaster: BroadcasterLinkForClient,
38        client: ClientLink,
39        _server: &HttpServerLink,
40    ) -> Self {
41        let (sender, _rx) = broadcast::channel(32);
42        Self {
43            config,
44            broadcaster,
45            client,
46            pickled: true, // TODO: Get from the config
47            metrics: HashMap::new(),
48            sender,
49        }
50    }
51}
52
53impl GraphitePublisher {
54    async fn graceful_shutdown(&mut self, ctx: &mut Context<Self>) {
55        // TODO: Do this for the client
56        //self.broadcaster.unsubscribe_all(ctx.address()).await.ok();
57        ctx.shutdown();
58    }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, Hash)]
62pub enum Group {
63    HeartBeat,
64    Streams,
65    Connection,
66}
67
68impl Actor for GraphitePublisher {
69    type GroupBy = Group;
70}
71
72#[async_trait]
73impl StartedBy<RillExport> for GraphitePublisher {
74    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
75        ctx.termination_sequence(vec![Group::HeartBeat, Group::Streams, Group::Connection]);
76        let interval = self.config.interval.unwrap_or(1_000);
77        let duration = Duration::from_millis(interval);
78        let heartbeat = HeartBeat::new(duration, ctx.address().clone());
79        ctx.spawn_task(heartbeat, Group::HeartBeat);
80        let connection = Connection::new(self.sender.clone());
81        ctx.spawn_task(connection, Group::Connection);
82        self.broadcaster
83            .subscribe_to_struct_changes(ctx.address().clone())
84            .await?;
85        Ok(())
86    }
87}
88
89#[async_trait]
90impl InterruptedBy<RillExport> for GraphitePublisher {
91    async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
92        self.graceful_shutdown(ctx).await;
93        Ok(())
94    }
95}
96
97#[async_trait]
98impl TaskEliminated<HeartBeat> for GraphitePublisher {
99    async fn handle(
100        &mut self,
101        _id: IdOf<HeartBeat>,
102        _result: Result<(), TaskError>,
103        ctx: &mut Context<Self>,
104    ) -> Result<(), Error> {
105        self.graceful_shutdown(ctx).await;
106        Ok(())
107    }
108}
109
110#[async_trait]
111impl TaskEliminated<Connection> for GraphitePublisher {
112    async fn handle(
113        &mut self,
114        _id: IdOf<Connection>,
115        _result: Result<(), TaskError>,
116        ctx: &mut Context<Self>,
117    ) -> Result<(), Error> {
118        self.graceful_shutdown(ctx).await;
119        Ok(())
120    }
121}
122
123#[async_trait]
124impl ActionHandler<Tick> for GraphitePublisher {
125    async fn handle(&mut self, _: Tick, _ctx: &mut Context<Self>) -> Result<(), Error> {
126        if self.sender.receiver_count() > 0 {
127            if self.pickled {
128                // Collect all metrics values into a pool
129                let mut pool = Vec::with_capacity(self.metrics.len());
130                for (path, record) in self.metrics.drain() {
131                    if let Some(record) = record.get().await {
132                        let ts = record.timestamp;
133                        let value = record.value;
134                        let line = (path.to_string(), (ts.as_secs(), value));
135                        log::trace!("Graphite export: {} - {}", path, value);
136                        pool.push(line);
137                    }
138                }
139                // Serialize with pickle
140                let mut buffer = Vec::new();
141                Write::write(&mut buffer, &0_u32.to_be_bytes())?;
142                serde_pickle::to_writer(&mut buffer, &pool, false)?;
143                let prefix_len = std::mem::size_of::<u32>();
144                let len: u32 = (buffer.len() - prefix_len).try_into()?;
145                buffer[0..prefix_len].copy_from_slice(&len.to_be_bytes());
146                self.sender.send(buffer).map_err(|_| {
147                    Error::msg("Can't send data to Graphite (no active connections)")
148                })?;
149            } else {
150                // TODO: Support the plain-text format
151            }
152        }
153        Ok(())
154    }
155}
156
157#[async_trait]
158impl ActionHandler<PathNotification> for GraphitePublisher {
159    async fn handle(
160        &mut self,
161        msg: PathNotification,
162        ctx: &mut Context<Self>,
163    ) -> Result<(), Error> {
164        match msg {
165            PathNotification::Paths { descriptions } => {
166                for description in descriptions {
167                    let path = &description.path;
168                    // TODO: Improve that... Maybe use `PatternMatcher` that wraps `HashSet` of `Patterns`
169                    let pattern = PathPattern { path: path.clone() };
170                    if self.config.paths.contains(&pattern) {
171                        let record = SharedRecord::new();
172                        self.metrics.insert(path.clone(), record.clone());
173                        let observer = Observer::new(description, self.client.clone(), record);
174                        ctx.spawn_task(observer, Group::Streams);
175                    }
176                }
177            }
178            PathNotification::Name { .. } => {}
179        }
180        Ok(())
181    }
182}
183
184#[async_trait]
185impl TaskEliminated<Observer> for GraphitePublisher {
186    async fn handle(
187        &mut self,
188        _id: IdOf<Observer>,
189        _result: Result<(), TaskError>,
190        _ctx: &mut Context<Self>,
191    ) -> Result<(), Error> {
192        Ok(())
193    }
194}
195
196struct Connection {
197    sender: broadcast::Sender<Vec<u8>>,
198}
199
200impl Connection {
201    fn new(sender: broadcast::Sender<Vec<u8>>) -> Self {
202        Self { sender }
203    }
204}
205
206#[async_trait]
207impl LiteTask for Connection {
208    type Output = ();
209
210    async fn repeatable_routine(&mut self) -> Result<Option<Self::Output>, Error> {
211        let mut rx = self.sender.subscribe();
212        // TODO: Make url configurable
213        loop {
214            // To reuse connection put this line up (outside of the loop and above `subscribe` call)
215            let mut socket = TcpStream::connect("127.0.0.1:2004").await?;
216            let data = rx.recv().await?;
217            socket.write_all(&data).await?;
218        }
219    }
220}