1use bitflags::bitflags;
2use bitflags_serde_shim::impl_serde_for_bitflags;
3use chrono::{DateTime, Utc};
4use http::header::HeaderValue;
5use serde::{
6 de::{self, Visitor},
7 Deserialize, Deserializer, Serialize, Serializer,
8};
9use serde_json::value::Value as JsonValue;
10use std::{
11 collections::HashMap,
12 fmt::{self, Display},
13 ops::Add,
14 str::FromStr,
15};
16
17pub use table::*;
18
19use crate::serde::deserialize_null_default;
20
21pub static TIMESTAMP_FIELD: &str = "_time";
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28#[non_exhaustive]
29pub enum ContentType {
30 Json,
32 NdJson,
35 Csv,
37}
38
39impl ContentType {
40 #[must_use]
42 pub fn as_str(&self) -> &'static str {
43 match self {
44 ContentType::Json => "application/json",
45 ContentType::NdJson => "application/x-ndjson",
46 ContentType::Csv => "text/csv",
47 }
48 }
49}
50
51impl Display for ContentType {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 write!(f, "{}", self.as_str())
54 }
55}
56
57impl FromStr for ContentType {
58 type Err = crate::error::Error;
59
60 fn from_str(s: &str) -> Result<Self, Self::Err> {
61 match s {
62 "application/json" => Ok(ContentType::Json),
63 "application/x-ndjson" => Ok(ContentType::NdJson),
64 "text/csv" => Ok(ContentType::Csv),
65 _ => Err(crate::error::Error::InvalidContentType(s.to_string())),
66 }
67 }
68}
69
70impl From<ContentType> for HeaderValue {
71 fn from(content_type: ContentType) -> Self {
72 HeaderValue::from_static(content_type.as_str())
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78#[non_exhaustive]
79pub enum ContentEncoding {
80 Identity,
82 Gzip,
84 Zstd,
86}
87
88impl ContentEncoding {
89 #[must_use]
91 pub fn as_str(&self) -> &'static str {
92 match self {
93 ContentEncoding::Identity => "",
94 ContentEncoding::Gzip => "gzip",
95 ContentEncoding::Zstd => "zstd",
96 }
97 }
98}
99
100impl Display for ContentEncoding {
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
102 write!(f, "{}", self.as_str())
103 }
104}
105
106impl FromStr for ContentEncoding {
107 type Err = crate::error::Error;
108
109 fn from_str(s: &str) -> Result<Self, Self::Err> {
110 match s {
111 "" => Ok(ContentEncoding::Identity),
112 "gzip" => Ok(ContentEncoding::Gzip),
113 "zstd" => Ok(ContentEncoding::Zstd),
114 _ => Err(crate::error::Error::InvalidContentEncoding(s.to_string())),
115 }
116 }
117}
118
119impl From<ContentEncoding> for HeaderValue {
120 fn from(content_encoding: ContentEncoding) -> Self {
121 HeaderValue::from_static(content_encoding.as_str())
122 }
123}
124
125#[derive(Serialize, Deserialize, Debug)]
127pub struct Dataset {
128 pub name: String,
130 pub description: String,
132 #[serde(rename = "who")]
134 pub created_by: String,
135 #[serde(rename = "created")]
137 pub created_at: DateTime<Utc>,
138 }
140
141#[derive(Serialize, Deserialize, Debug)]
143#[serde(rename_all = "camelCase")]
144pub struct Stat {
145 pub name: String,
147 pub num_events: u64,
149 pub num_fields: u32,
151 pub input_bytes: u64,
153 pub compressed_bytes: u64,
155 pub min_time: Option<DateTime<Utc>>,
157 pub max_time: Option<DateTime<Utc>>,
159 #[serde(rename = "created")]
161 pub created_at: DateTime<Utc>,
162}
163
164#[derive(Serialize, Deserialize, Debug)]
166#[serde(rename_all = "camelCase")]
167pub struct Info {
168 #[serde(flatten)]
170 pub stat: Stat,
171 pub fields: Vec<Field>,
173}
174
175#[derive(Serialize, Deserialize, Debug, Default)]
177#[serde(rename_all = "camelCase")]
178pub struct IngestStatus {
179 pub ingested: u64,
181 pub failed: u64,
183 pub failures: Vec<IngestFailure>,
185 pub processed_bytes: u64,
187 #[deprecated(
189 since = "0.8.0",
190 note = "This field will be removed in a future version."
191 )]
192 pub blocks_created: u32,
193 #[deprecated(
195 since = "0.8.0",
196 note = "This field will be removed in a future version."
197 )]
198 pub wal_length: u32,
199}
200
201impl Add for IngestStatus {
202 type Output = Self;
203
204 fn add(self, other: Self) -> Self {
205 let mut failures = self.failures;
206 failures.extend(other.failures);
207
208 #[allow(deprecated)]
209 Self {
210 ingested: self.ingested + other.ingested,
211 failed: self.failed + other.failed,
212 failures,
213 processed_bytes: self.processed_bytes + other.processed_bytes,
214 blocks_created: self.blocks_created + other.blocks_created,
215 wal_length: other.wal_length,
216 }
217 }
218}
219
220#[derive(Serialize, Deserialize, Debug)]
222pub struct IngestFailure {
223 pub timestamp: DateTime<Utc>,
225 pub error: String,
227}
228
229#[derive(Serialize, Debug)]
231pub(crate) struct DatasetCreateRequest {
232 pub name: String,
236 pub description: String,
238}
239
240#[derive(Serialize, Deserialize, Debug)]
242pub(crate) struct DatasetUpdateRequest {
243 pub description: String,
245}
246
247#[derive(Serialize, Deserialize, Debug, Default, Eq, PartialEq)]
250#[serde(rename_all = "camelCase")]
251pub struct Query {
252 pub apl: String,
254 pub start_time: Option<DateTime<Utc>>,
256 pub end_time: Option<DateTime<Utc>>,
258 pub cursor: Option<String>,
260 pub include_cursor: bool,
262 pub include_cursor_field: bool,
264}
265
266impl Query {
267 pub fn new<S: ToString + ?Sized>(apl: &S, opts: QueryOptions) -> Self {
269 Self {
270 apl: apl.to_string(),
271 start_time: opts.start_time,
272 end_time: opts.end_time,
273 cursor: opts.cursor,
274 include_cursor: opts.include_cursor,
275 include_cursor_field: opts.include_cursor_field,
276 }
277 }
278}
279
280#[allow(clippy::trivially_copy_pass_by_ref)]
282fn is_false(b: &bool) -> bool {
283 !*b
284}
285
286#[derive(Serialize, Debug, Default)]
288pub(crate) struct QueryParams {
289 #[serde(rename = "nocache", skip_serializing_if = "is_false")]
290 pub no_cache: bool,
291 #[serde(rename = "saveAsKind", skip_serializing_if = "Option::is_none")]
292 pub save: Option<QueryKind>,
293 pub format: AplResultFormat,
294}
295
296impl From<&QueryOptions> for QueryParams {
297 fn from(options: &QueryOptions) -> Self {
298 let save = if options.save {
299 Some(QueryKind::Apl)
300 } else {
301 None
302 };
303 Self {
304 no_cache: options.no_cache,
305 save,
306 format: options.format,
307 }
308 }
309}
310
311#[allow(clippy::struct_excessive_bools)]
313#[derive(Debug, Default, Clone)]
315pub struct QueryOptions {
316 pub start_time: Option<DateTime<Utc>>,
318 pub end_time: Option<DateTime<Utc>>,
320 pub cursor: Option<String>,
322 pub include_cursor: bool,
325 pub no_cache: bool,
327 pub save: bool,
330 pub format: AplResultFormat,
332 pub include_cursor_field: bool,
334}
335
336#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Default)]
338#[non_exhaustive]
339#[serde(rename_all = "camelCase")]
340pub enum AplResultFormat {
341 #[default]
343 Tabular,
344}
345
346#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
348#[non_exhaustive]
349#[serde(rename_all = "camelCase")]
350pub enum QueryKind {
351 #[default]
353 Analytics,
354 Stream,
356 Apl,
358}
359
360#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
362pub struct Projection {
363 pub field: String,
365 pub alias: Option<String>,
367}
368
369#[derive(Debug, PartialEq, Eq)]
371#[non_exhaustive]
372pub enum AggregationOp {
373 Count,
375 CountDistinct,
377 MakeSet,
379 MakeSetIf,
381
382 Sum,
384 Avg,
386 Min,
388 Max,
390 Topk,
392 Percentiles,
394 Histogram,
396 StandardDeviation,
398 Variance,
400 ArgMin,
402 ArgMax,
404
405 CountIf,
408 DistinctIf,
411
412 Unknown(String),
414}
415
416impl Serialize for AggregationOp {
417 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
418 where
419 S: Serializer,
420 {
421 serializer.serialize_str(match self {
422 Self::Count => "count",
423 Self::CountDistinct => "distinct",
424 Self::MakeSet => "makeset",
425 Self::MakeSetIf => "makesetif",
426 Self::Sum => "sum",
427 Self::Avg => "avg",
428 Self::Min => "min",
429 Self::Max => "max",
430 Self::Topk => "topk",
431 Self::Percentiles => "percentiles",
432 Self::Histogram => "histogram",
433 Self::StandardDeviation => "stdev",
434 Self::Variance => "variance",
435 Self::ArgMin => "argmin",
436 Self::ArgMax => "argmax",
437 Self::CountIf => "countif",
438 Self::DistinctIf => "distinctif",
439 Self::Unknown(ref s) => s,
440 })
441 }
442}
443
444struct AggregationOpVisitor;
445
446impl Visitor<'_> for AggregationOpVisitor {
447 type Value = AggregationOp;
448
449 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
450 write!(formatter, "a valid aggregation op string")
451 }
452
453 fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
454 where
455 E: de::Error,
456 {
457 match s {
458 "count" => Ok(Self::Value::Count),
459 "distinct" => Ok(Self::Value::CountDistinct),
460 "makeset" => Ok(Self::Value::MakeSet),
461 "makesetif" => Ok(Self::Value::MakeSetIf),
462 "sum" => Ok(Self::Value::Sum),
463 "avg" => Ok(Self::Value::Avg),
464 "min" => Ok(Self::Value::Min),
465 "max" => Ok(Self::Value::Max),
466 "topk" => Ok(Self::Value::Topk),
467 "percentiles" => Ok(Self::Value::Percentiles),
468 "histogram" => Ok(Self::Value::Histogram),
469 "stdev" => Ok(Self::Value::StandardDeviation),
470 "variance" => Ok(Self::Value::Variance),
471 "argmin" => Ok(Self::Value::ArgMin),
472 "argmax" => Ok(Self::Value::ArgMax),
473 "countif" => Ok(Self::Value::CountIf),
474 "distinctif" => Ok(Self::Value::DistinctIf),
475 aggregation => Ok(Self::Value::Unknown(aggregation.to_string())),
476 }
477 }
478}
479
480impl<'de> Deserialize<'de> for AggregationOp {
481 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
482 where
483 D: Deserializer<'de>,
484 {
485 deserializer.deserialize_str(AggregationOpVisitor {})
486 }
487}
488
489#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
491pub struct Aggregation {
492 pub alias: Option<String>,
494 pub op: AggregationOp,
496 pub field: String,
498 #[serde(skip_serializing_if = "Option::is_none")]
502 pub argument: Option<JsonValue>,
503}
504
505#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
507#[non_exhaustive]
508#[serde(rename_all = "lowercase")]
509enum FilterOp {
510 And,
512 Or,
514 Not,
516
517 #[serde(rename = "==")]
520 Equal,
521 #[serde(rename = "!=")]
523 NotEqual,
524 Exists,
526 NotExists,
528
529 #[serde(rename = ">")]
532 GreaterThan,
533 #[serde(rename = ">=")]
535 GreaterThanEqual,
536 #[serde(rename = "<")]
538 LessThan,
539 #[serde(rename = "<=")]
541 LessThanEqual,
542
543 StartsWith,
546 NotStartsWith,
548 EndsWith,
550 NotEndsWith,
552 Regexp,
554 NotRegexp,
556
557 Contains,
560 NotContains,
562}
563
564#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
566#[serde(rename_all = "camelCase")]
567struct Filter {
568 pub op: FilterOp,
570 pub field: String,
572 pub value: JsonValue,
574 #[serde(default)]
576 pub case_insensitive: bool,
577 #[serde(default, deserialize_with = "deserialize_null_default")]
579 pub children: Vec<Filter>,
580}
581
582impl Default for Filter {
583 fn default() -> Self {
584 Filter {
585 op: FilterOp::Equal,
586 field: String::new(),
587 value: JsonValue::Null,
588 case_insensitive: false,
589 children: vec![],
590 }
591 }
592}
593
594#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
598pub struct VirtualField {
599 pub alias: String,
601 pub expr: String,
603}
604
605mod table;
606#[derive(Serialize, Deserialize, Debug)]
608#[serde(rename_all = "camelCase")]
609pub struct QueryResult {
610 pub status: QueryStatus,
612
613 pub tables: Vec<table::Table>,
615 #[serde(skip)]
619 pub saved_query_id: Option<String>,
620}
621
622#[derive(Serialize, Deserialize, Debug)]
624#[serde(rename_all = "camelCase")]
625pub struct QueryStatus {
626 pub elapsed_time: u64,
628 pub blocks_examined: u64,
630 pub rows_examined: u64,
632 pub rows_matched: u64,
634 pub num_groups: u32,
636 pub is_partial: bool,
638 pub continuation_token: Option<String>,
641 #[serde(default)]
643 pub is_estimate: bool,
644 pub cache_status: CacheStatus,
646 pub min_block_time: DateTime<Utc>,
648 pub max_block_time: DateTime<Utc>,
650 #[serde(default, deserialize_with = "deserialize_null_default")]
652 pub messages: Vec<QueryMessage>,
653 pub max_cursor: Option<String>,
655 pub min_cursor: Option<String>,
657}
658
659bitflags! {
660 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
662 pub struct CacheStatus: u32 {
663 const Miss = 1;
665 const Materialized = 2;
667 const Results = 4;
669 const WalCached = 8;
671 }
672}
673impl_serde_for_bitflags!(CacheStatus);
674
675#[derive(Serialize, Deserialize, Debug)]
677pub struct QueryMessage {
678 priority: QueryMessagePriority,
679 count: u32,
680 code: QueryMessageCode,
681 text: Option<String>,
682}
683
684#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Copy)]
686#[non_exhaustive]
687#[serde(rename_all = "camelCase")]
688pub enum QueryMessagePriority {
689 Trace,
691 Debug,
693 Info,
695 Warn,
697 Error,
699 Fatal,
701}
702
703#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Copy)]
705#[non_exhaustive]
706#[serde(rename_all = "snake_case")]
707pub enum QueryMessageCode {
708 VirtualFieldFinalizeError,
710 MissingColumn,
712 DefaultLimitWarning,
714 LicenseLimitForQueryWarning,
716 #[serde(other)]
718 Unknown,
719}
720
721#[derive(Serialize, Deserialize, Debug)]
723pub struct Entry {
724 #[serde(rename = "_time")]
727 pub time: DateTime<Utc>,
728 #[serde(rename = "_sysTime")]
730 pub sys_time: DateTime<Utc>,
731 #[serde(rename = "_rowId")]
733 pub row_id: String,
734 pub data: HashMap<String, JsonValue>,
737}
738
739#[derive(Serialize, Deserialize, Debug)]
741pub struct Timeseries {
742 pub series: Vec<Interval>,
744 pub totals: Vec<EntryGroup>,
746}
747
748#[derive(Serialize, Deserialize, Debug)]
750#[serde(rename_all = "camelCase")]
751pub struct Interval {
752 pub start_time: DateTime<Utc>,
754 pub end_time: DateTime<Utc>,
756 #[serde(default, deserialize_with = "deserialize_null_default")]
758 pub groups: Vec<EntryGroup>,
759}
760
761#[derive(Serialize, Deserialize, Debug)]
763pub struct EntryGroup {
764 pub id: u64,
766 pub group: HashMap<String, JsonValue>,
768 pub aggregations: Vec<EntryGroupAgg>,
770}
771
772#[derive(Serialize, Deserialize, Debug)]
774pub struct EntryGroupAgg {
775 #[serde(rename = "op")]
777 pub alias: String,
778 pub value: JsonValue,
780}
781
782#[cfg(test)]
783mod test {
784 use super::*;
785
786 #[test]
787 fn test_aggregation_op() {
788 let enum_repr = AggregationOp::Count;
789 let json_repr = r#""count""#;
790 assert_eq!(
791 serde_json::to_string(&enum_repr).expect("json error"),
792 json_repr
793 );
794 assert_eq!(
795 serde_json::from_str::<AggregationOp>(json_repr).expect("json error"),
796 enum_repr
797 );
798 }
799
800 #[test]
801 fn test_filter_op() {
802 let enum_repr = FilterOp::And;
803 let json_repr = r#""and""#;
804 assert_eq!(
805 serde_json::to_string(&enum_repr).expect("json error"),
806 json_repr
807 );
808 assert_eq!(
809 serde_json::from_str::<FilterOp>(json_repr).expect("json error"),
810 enum_repr
811 );
812
813 let enum_repr = FilterOp::Equal;
814 let json_repr = r#""==""#;
815 assert_eq!(
816 serde_json::to_string(&enum_repr).expect("json error"),
817 json_repr
818 );
819 assert_eq!(
820 serde_json::from_str::<FilterOp>(json_repr).expect("json error"),
821 enum_repr
822 );
823 }
824
825 #[test]
826 fn test_kind_false() {
827 let query = QueryParams {
828 no_cache: false,
829 save: None,
830 format: AplResultFormat::Tabular,
831 };
832 let json_repr = r#"{"format":"tabular"}"#;
833 assert_eq!(
834 serde_json::to_string(&query).expect("json error"),
835 json_repr
836 );
837
838 let query = QueryParams {
839 no_cache: true,
840 save: None,
841 format: AplResultFormat::Tabular,
842 };
843 let json_repr = r#"{"nocache":true,"format":"tabular"}"#;
844 assert_eq!(
845 serde_json::to_string(&query).expect("json error"),
846 json_repr
847 );
848
849 let query = QueryParams {
850 no_cache: false,
851 save: Some(QueryKind::Apl),
852 format: AplResultFormat::Tabular,
853 };
854 let json_repr = r#"{"saveAsKind":"apl","format":"tabular"}"#;
855 assert_eq!(
856 serde_json::to_string(&query).expect("json error"),
857 json_repr
858 );
859 }
860}