xray_tracing/
lib.rs

1#![feature(is_some_and)]
2#![warn(missing_docs)]
3//#![deny(warnings)]
4//! Provides a client interface for [AWS X-Ray](https://aws.amazon.com/xray/)
5
6use serde::Serialize;
7use std::{
8    env,
9    net::{SocketAddr, UdpSocket},
10    result::Result as StdResult,
11    sync::Arc,
12};
13
14mod epoch;
15mod error;
16mod header;
17mod hexbytes;
18mod lambda;
19pub mod segment;
20mod segment_id;
21mod trace_id;
22mod tracing;
23
24pub use crate::{
25    epoch::Seconds,
26    error::Error,
27    header::Header,
28    segment::*,
29    segment_id::SegmentId,
30    trace_id::TraceId,
31    tracing::XRaySubscriber,
32    tracing::aws_metadata,
33};
34
35/// Type alias for Results which may return `xray::Errors`
36pub type Result<T> = StdResult<T, Error>;
37
38/// X-Ray daemon client interface
39#[derive(Debug)]
40pub struct Client {
41    socket: Arc<UdpSocket>,
42}
43
44impl Default for Client {
45    /// Return a client configured to send trace data to an
46    /// address identified by a `AWS_XRAY_DAEMON_ADDRESS` env variable
47    /// or `127.0.0.1:2000`
48    fn default() -> Self {
49        // https://docs.aws.amazon.com/lambda/latest/dg/lambda-x-ray.html
50        // todo documment error handling
51        let addr: SocketAddr = env::var("AWS_XRAY_DAEMON_ADDRESS")
52            .ok()
53            .and_then(|value| value.parse::<SocketAddr>().ok())
54            .unwrap_or_else(|| {
55                log::trace!("No valid `AWS_XRAY_DAEMON_ADDRESS` env variable detected falling back on default: 127.0.0.1:2000");
56                ([127, 0, 0, 1], 2000).into()
57            });
58
59        Client::new(addr).expect("failed to connect to socket")
60    }
61}
62
63impl Client {
64    const HEADER: &'static [u8] = br#"{"format": "json", "version": 1}
65"#;
66
67    /// Return a new X-Ray client connected
68    /// to the provided `addr`
69    pub fn new(addr: SocketAddr) -> Result<Self> {
70        let socket = Arc::new(UdpSocket::bind(&[([0, 0, 0, 0], 0).into()][..])?);
71        socket.set_nonblocking(true)?;
72        socket.connect(&addr)?;
73        log::trace!("connecting to xray daemon {}", addr);
74        Ok(Client { socket })
75    }
76
77    #[inline]
78    fn packet<S>(data: S) -> Result<Vec<u8>>
79    where
80        S: Serialize,
81    {
82        let bytes = serde_json::to_vec(&data)?;
83        Ok([Self::HEADER, &bytes].concat())
84    }
85
86    /// send a segment to the xray daemon this client is connected to
87    pub fn send<S>(
88        &self,
89        data: &S,
90    ) -> Result<()>
91    where
92        S: Serialize,
93    {
94        log::trace!(
95            "sending trace data {}",
96            serde_json::to_string_pretty(&data).unwrap_or_default()
97        );
98        let out = self.socket.send(&Self::packet(data)?)?;
99        log::trace!("send? {:?}", out);
100        Ok(())
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107
108    #[test]
109    #[ignore]
110    fn client_can_send_data() {
111        env_logger::init();
112        let mut segment = Segment::begin(
113            "test-segment",
114            SegmentId::default(),
115            None,
116            TraceId::default(),
117        );
118        std::thread::sleep(std::time::Duration::from_secs(1));
119        segment.end();
120        if let Err(e) = Client::default().send(&segment) {
121            assert!(false, "failed to send data: {}", e)
122        }
123    }
124
125    #[test]
126    fn client_prefixes_packets_with_header() {
127        assert_eq!(
128            Client::packet(serde_json::json!({
129                "foo": "bar"
130            }))
131            .unwrap(),
132            br#"{"format": "json", "version": 1}
133{"foo":"bar"}"#
134                .to_vec()
135        )
136    }
137}