avassa_client/volga/
log_query.rs

1use crate::Result;
2use futures_util::{
3    stream::{Stream, StreamExt},
4    SinkExt,
5};
6use serde::Serialize;
7use tokio_tungstenite::tungstenite::Message as WSMessage;
8
9/// Used to query logs 'since'.
10#[derive(Clone)]
11pub enum Since {
12    Seconds(u64),
13    Minutes(u64),
14    Hours(u64),
15}
16
17impl serde::ser::Serialize for Since {
18    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
19    where
20        S: serde::ser::Serializer,
21    {
22        match self {
23            Self::Seconds(v) => serializer.serialize_str(&format!("{v}s")),
24            Self::Minutes(v) => serializer.serialize_str(&format!("{v}m")),
25            Self::Hours(v) => serializer.serialize_str(&format!("{v}h")),
26        }
27    }
28}
29
30macro_rules! query_setter {
31    ($name: ident, $type:ty, $doc:literal) => {
32        #[doc=$doc]
33        #[must_use]
34        pub fn $name(self, $name: &$type) -> Query {
35            Self {
36                $name: Some($name.into()),
37                ..self
38            }
39        }
40    };
41}
42
43/// Log query parameters
44#[derive(Serialize)]
45pub struct Query {
46    op: String,
47
48    #[serde(skip_serializing_if = "Option::is_none")]
49    since: Option<Since>,
50
51    #[serde(skip_serializing_if = "Option::is_none")]
52    name: Option<String>,
53
54    #[serde(skip_serializing_if = "Option::is_none")]
55    service: Option<String>,
56
57    #[serde(skip_serializing_if = "Option::is_none")]
58    ix: Option<u32>,
59
60    #[serde(skip_serializing_if = "Option::is_none")]
61    application: Option<String>,
62
63    #[serde(skip_serializing_if = "Option::is_none")]
64    search_error: Option<bool>,
65
66    #[serde(skip_serializing_if = "Option::is_none")]
67    re: Option<String>,
68
69    #[serde(skip_serializing_if = "Option::is_none")]
70    deep_re: Option<String>,
71
72    #[serde(skip_serializing_if = "Option::is_none")]
73    re_hits: Option<u64>,
74
75    #[serde(skip_serializing_if = "Option::is_none")]
76    count: Option<String>,
77
78    #[serde(skip_serializing_if = "Option::is_none")]
79    dc: Option<String>,
80}
81
82impl Query {
83    /// Create a query instance
84    #[must_use]
85    pub fn new() -> Self {
86        Self {
87            op: "query_logs".into(),
88            since: None,
89            name: None,
90            service: None,
91            ix: None,
92            application: None,
93            search_error: None,
94            re: None,
95            deep_re: None,
96            re_hits: None,
97            count: None,
98            dc: None,
99        }
100    }
101
102    /// The name of the docker image
103    #[must_use]
104    pub fn image_name(self, image_name: &str) -> Self {
105        Self {
106            application: Some(image_name.into()),
107            ..self
108        }
109    }
110
111    /// When we have multiple replicas, by deafult all replicated
112    /// logs are read and merged, if we wish to read only one
113    /// replica log, we can indicate which replica index to follow
114    #[must_use]
115    pub fn replica_index(self, ix: u32) -> Self {
116        Self {
117            ix: Some(ix),
118            ..self
119        }
120    }
121
122    query_setter!(application, str, "Filter on application name");
123
124    ///Get logs since
125    #[must_use]
126    pub fn since(self, since: &Since) -> Self {
127        Self {
128            since: Some(since.clone()),
129            ..self
130        }
131    }
132
133    query_setter!(service, str, "Filter on service name");
134
135    query_setter!(dc, str, "Filter on datacenter name");
136
137    query_setter!(re, str, "Merge all logs and search the merged result for the provided perl regular expression. Drop all data until a regular expression matches");
138
139    query_setter!(deep_re, str, "Evaluate the regular expression on all nodes where the containers run, for each node, drop all data until regular expression matches.");
140
141    query_setter!(
142        count,
143        str,
144        "Count the number of matching regular expressions"
145    );
146
147    /// This is a shorthand to search for the first error in all
148    /// logs. Can be combined with [`Self::since`] and [`Self::re_hits`]
149    #[must_use]
150    pub fn search_error(self) -> Self {
151        Self {
152            search_error: Some(true),
153            ..self
154        }
155    }
156
157    /// With either of the regular expression searches, continue
158    /// to drop data until `re_hits` log entries have matched
159    #[must_use]
160    pub fn re_hits(self, re_hits: u64) -> Self {
161        Self {
162            re_hits: Some(re_hits),
163            ..self
164        }
165    }
166}
167
168/// Stream for query results
169#[pin_project::pin_project]
170pub struct QueryStream {
171    ws: super::WebSocketStream,
172}
173
174impl Default for Query {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl QueryStream {
181    pub(crate) async fn new(avassa_client: &crate::Client, query: &Query) -> Result<Self> {
182        let ws_uri = avassa_client
183            .websocket_url
184            .join("volga")?
185            .to_string()
186            .parse()?;
187        let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
188            .with_header(
189                "Authorization",
190                format!("Bearer {}", avassa_client.bearer_token().await),
191            );
192        let tls = avassa_client.open_tls_stream().await?;
193        let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
194
195        let json = serde_json::to_string_pretty(&query)?;
196        tracing::debug!("{}", json);
197
198        ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
199            .await?;
200
201        Ok(Self { ws })
202    }
203
204    /// Try to read one message
205    pub async fn recv(&mut self) -> Result<Option<String>> {
206        match self.ws.next().await {
207            Some(Ok(val)) => match val {
208                WSMessage::Binary(v) => Ok(Some(String::from_utf8_lossy(&v).to_string())),
209                _ => unreachable!(),
210            },
211            Some(Err(e)) => Err(e.into()),
212            None => Ok(None),
213        }
214    }
215}
216
217impl Stream for QueryStream {
218    type Item = crate::Result<String>;
219
220    fn poll_next(
221        self: std::pin::Pin<&mut Self>,
222        cx: &mut core::task::Context<'_>,
223    ) -> core::task::Poll<Option<Self::Item>> {
224        let mut this = self.project();
225
226        match core::pin::Pin::new(&mut this.ws).poll_next(cx) {
227            core::task::Poll::Ready(val) => {
228                let res: Option<Self::Item> = match val {
229                    Some(Ok(WSMessage::Binary(m))) => Some(Ok(String::from_utf8_lossy(&m).into())),
230                    Some(Ok(msg)) => Some(Err(crate::Error::Volga(Some(format!(
231                        "Unexpected message ({msg:?})",
232                    ))))),
233                    Some(Err(e)) => Some(Err(e.into())),
234                    None => None,
235                };
236
237                core::task::Poll::Ready(res)
238            }
239            core::task::Poll::Pending => core::task::Poll::Pending,
240        }
241    }
242}
243
244#[cfg(test)]
245mod test {
246
247    #[test]
248    fn setter() {
249        let query = super::Query::new().application("foo");
250        assert_eq!(&query.application.unwrap(), "foo");
251    }
252}