avassa_client/volga/
log_query.rs1use crate::Result;
2use futures_util::{
3 stream::{Stream, StreamExt},
4 SinkExt,
5};
6use serde::Serialize;
7use tokio_tungstenite::tungstenite::Message as WSMessage;
8
9#[derive(Clone)]
11pub enum Since {
12 Seconds(u64),
13 Minutes(u64),
14 Hours(u64),
15}
16
17impl serde::ser::Serialize for Since {
18 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
19 where
20 S: serde::ser::Serializer,
21 {
22 match self {
23 Self::Seconds(v) => serializer.serialize_str(&format!("{v}s")),
24 Self::Minutes(v) => serializer.serialize_str(&format!("{v}m")),
25 Self::Hours(v) => serializer.serialize_str(&format!("{v}h")),
26 }
27 }
28}
29
30macro_rules! query_setter {
31 ($name: ident, $type:ty, $doc:literal) => {
32 #[doc=$doc]
33 #[must_use]
34 pub fn $name(self, $name: &$type) -> Query {
35 Self {
36 $name: Some($name.into()),
37 ..self
38 }
39 }
40 };
41}
42
43#[derive(Serialize)]
45pub struct Query {
46 op: String,
47
48 #[serde(skip_serializing_if = "Option::is_none")]
49 since: Option<Since>,
50
51 #[serde(skip_serializing_if = "Option::is_none")]
52 name: Option<String>,
53
54 #[serde(skip_serializing_if = "Option::is_none")]
55 service: Option<String>,
56
57 #[serde(skip_serializing_if = "Option::is_none")]
58 ix: Option<u32>,
59
60 #[serde(skip_serializing_if = "Option::is_none")]
61 application: Option<String>,
62
63 #[serde(skip_serializing_if = "Option::is_none")]
64 search_error: Option<bool>,
65
66 #[serde(skip_serializing_if = "Option::is_none")]
67 re: Option<String>,
68
69 #[serde(skip_serializing_if = "Option::is_none")]
70 deep_re: Option<String>,
71
72 #[serde(skip_serializing_if = "Option::is_none")]
73 re_hits: Option<u64>,
74
75 #[serde(skip_serializing_if = "Option::is_none")]
76 count: Option<String>,
77
78 #[serde(skip_serializing_if = "Option::is_none")]
79 dc: Option<String>,
80}
81
82impl Query {
83 #[must_use]
85 pub fn new() -> Self {
86 Self {
87 op: "query_logs".into(),
88 since: None,
89 name: None,
90 service: None,
91 ix: None,
92 application: None,
93 search_error: None,
94 re: None,
95 deep_re: None,
96 re_hits: None,
97 count: None,
98 dc: None,
99 }
100 }
101
102 #[must_use]
104 pub fn image_name(self, image_name: &str) -> Self {
105 Self {
106 application: Some(image_name.into()),
107 ..self
108 }
109 }
110
111 #[must_use]
115 pub fn replica_index(self, ix: u32) -> Self {
116 Self {
117 ix: Some(ix),
118 ..self
119 }
120 }
121
122 query_setter!(application, str, "Filter on application name");
123
124 #[must_use]
126 pub fn since(self, since: &Since) -> Self {
127 Self {
128 since: Some(since.clone()),
129 ..self
130 }
131 }
132
133 query_setter!(service, str, "Filter on service name");
134
135 query_setter!(dc, str, "Filter on datacenter name");
136
137 query_setter!(re, str, "Merge all logs and search the merged result for the provided perl regular expression. Drop all data until a regular expression matches");
138
139 query_setter!(deep_re, str, "Evaluate the regular expression on all nodes where the containers run, for each node, drop all data until regular expression matches.");
140
141 query_setter!(
142 count,
143 str,
144 "Count the number of matching regular expressions"
145 );
146
147 #[must_use]
150 pub fn search_error(self) -> Self {
151 Self {
152 search_error: Some(true),
153 ..self
154 }
155 }
156
157 #[must_use]
160 pub fn re_hits(self, re_hits: u64) -> Self {
161 Self {
162 re_hits: Some(re_hits),
163 ..self
164 }
165 }
166}
167
168#[pin_project::pin_project]
170pub struct QueryStream {
171 ws: super::WebSocketStream,
172}
173
174impl Default for Query {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180impl QueryStream {
181 pub(crate) async fn new(avassa_client: &crate::Client, query: &Query) -> Result<Self> {
182 let ws_uri = avassa_client
183 .websocket_url
184 .join("volga")?
185 .to_string()
186 .parse()?;
187 let req_builder = tokio_tungstenite::tungstenite::client::ClientRequestBuilder::new(ws_uri)
188 .with_header(
189 "Authorization",
190 format!("Bearer {}", avassa_client.bearer_token().await),
191 );
192 let tls = avassa_client.open_tls_stream().await?;
193 let (mut ws, _) = tokio_tungstenite::client_async(req_builder, tls).await?;
194
195 let json = serde_json::to_string_pretty(&query)?;
196 tracing::debug!("{}", json);
197
198 ws.send(WSMessage::Binary(serde_json::to_vec(&query)?.into()))
199 .await?;
200
201 Ok(Self { ws })
202 }
203
204 pub async fn recv(&mut self) -> Result<Option<String>> {
206 match self.ws.next().await {
207 Some(Ok(val)) => match val {
208 WSMessage::Binary(v) => Ok(Some(String::from_utf8_lossy(&v).to_string())),
209 _ => unreachable!(),
210 },
211 Some(Err(e)) => Err(e.into()),
212 None => Ok(None),
213 }
214 }
215}
216
217impl Stream for QueryStream {
218 type Item = crate::Result<String>;
219
220 fn poll_next(
221 self: std::pin::Pin<&mut Self>,
222 cx: &mut core::task::Context<'_>,
223 ) -> core::task::Poll<Option<Self::Item>> {
224 let mut this = self.project();
225
226 match core::pin::Pin::new(&mut this.ws).poll_next(cx) {
227 core::task::Poll::Ready(val) => {
228 let res: Option<Self::Item> = match val {
229 Some(Ok(WSMessage::Binary(m))) => Some(Ok(String::from_utf8_lossy(&m).into())),
230 Some(Ok(msg)) => Some(Err(crate::Error::Volga(Some(format!(
231 "Unexpected message ({msg:?})",
232 ))))),
233 Some(Err(e)) => Some(Err(e.into())),
234 None => None,
235 };
236
237 core::task::Poll::Ready(res)
238 }
239 core::task::Poll::Pending => core::task::Poll::Pending,
240 }
241 }
242}
243
244#[cfg(test)]
245mod test {
246
247 #[test]
248 fn setter() {
249 let query = super::Query::new().application("foo");
250 assert_eq!(&query.application.unwrap(), "foo");
251 }
252}