posthog_cli/utils/
query.rs

1use std::collections::HashMap;
2
3use anyhow::Error;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7// TODO - we could formalise a lot of this and move it into posthog-rs, tbh
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct QueryRequest {
11    pub query: Query,
12    #[serde(skip_serializing_if = "Option::is_none")]
13    pub refresh: Option<QueryRefresh>,
14}
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17#[serde(tag = "kind")]
18pub enum Query {
19    HogQLQuery { query: String },
20    HogQLMetadata(MetadataQuery),
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24#[serde(rename_all = "snake_case")]
25pub enum QueryRefresh {
26    Blocking,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct MetadataQuery {
31    pub language: MetadataLanguage,
32    pub query: String,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub source: Option<Box<Query>>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub enum MetadataLanguage {
39    #[serde(rename = "hogQL")]
40    HogQL,
41}
42
43pub type HogQLQueryResult = Result<HogQLQueryResponse, HogQLQueryErrorResponse>;
44
45#[derive(Debug, Clone, Deserialize, Serialize)]
46pub struct HogQLQueryResponse {
47    pub cache_key: Option<String>,
48    pub cache_target_age: Option<String>,
49    pub clickhouse: Option<String>, // Clickhouse query text
50    #[serde(default, deserialize_with = "null_is_empty")]
51    pub columns: Vec<String>, // Columns returned from the query
52    pub error: Option<String>,
53    #[serde(default, deserialize_with = "null_is_empty")]
54    pub explain: Vec<String>,
55    #[serde(default, rename = "hasMore", deserialize_with = "null_is_false")]
56    pub has_more: bool,
57    pub hogql: Option<String>, // HogQL query text
58    #[serde(default, deserialize_with = "null_is_false")]
59    pub is_cached: bool,
60    pub last_refresh: Option<String>, // Last time the query was refreshed
61    pub next_allowed_client_refresh_time: Option<String>, // Next time the client can refresh the query
62    pub offset: Option<i64>,                              // Offset of the response rows
63    pub limit: Option<i64>,                               // Limit of the query
64    pub query: Option<String>,                            // Query text
65    #[serde(default, deserialize_with = "null_is_empty")]
66    pub types: Vec<(String, String)>,
67    #[serde(default, deserialize_with = "null_is_empty")]
68    pub results: Vec<Vec<Value>>,
69    #[serde(default, deserialize_with = "null_is_empty")]
70    pub timings: Vec<Timing>,
71    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
72    pub other: HashMap<String, Value>,
73}
74
75#[derive(Debug, Clone, Deserialize, Serialize)]
76pub struct HogQLQueryErrorResponse {
77    pub code: String,
78    pub detail: String,
79    #[serde(rename = "type")]
80    pub error_type: String,
81    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
82    pub other: HashMap<String, Value>,
83}
84
85#[derive(Debug, Clone, Deserialize, Serialize)]
86pub struct Timing {
87    pub k: String,
88    pub t: f64,
89}
90
91#[derive(Debug, Clone, Deserialize, Serialize)]
92pub struct MetadataResponse {
93    #[serde(default, deserialize_with = "null_is_empty")]
94    pub errors: Vec<Notice>,
95    #[serde(default, deserialize_with = "null_is_empty")]
96    pub notices: Vec<Notice>,
97    #[serde(default, deserialize_with = "null_is_empty")]
98    pub warnings: Vec<Notice>,
99    #[serde(default, rename = "isUsingIndices")]
100    pub is_using_indices: Option<IndicesUsage>,
101    #[serde(default, deserialize_with = "null_is_false", rename = "isValid")]
102    pub is_valid: bool,
103    #[serde(default, deserialize_with = "null_is_false")]
104    pub is_valid_view: bool,
105    #[serde(default, deserialize_with = "null_is_empty")]
106    pub table_names: Vec<String>,
107    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
108    pub other: HashMap<String, Value>,
109}
110
111#[derive(Debug, Clone, Deserialize, Serialize)]
112#[serde(rename_all = "lowercase")]
113pub enum IndicesUsage {
114    Undecisive,
115    No,
116    Partial,
117    Yes,
118}
119
120#[derive(Debug, Clone, Deserialize, Serialize)]
121pub struct Notice {
122    pub message: String,
123    #[serde(flatten)]
124    pub span: Option<NoticeSpan>,
125    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
126    pub other: HashMap<String, Value>,
127}
128
129#[derive(Debug, Clone, Deserialize, Serialize)]
130pub struct NoticeSpan {
131    pub start: usize,
132    pub end: usize,
133}
134
135fn null_is_empty<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
136where
137    D: serde::Deserializer<'de>,
138    T: serde::Deserialize<'de>,
139{
140    let opt = Option::deserialize(deserializer)?;
141    match opt {
142        Some(v) => Ok(v),
143        None => Ok(Vec::new()),
144    }
145}
146
147fn null_is_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
148where
149    D: serde::Deserializer<'de>,
150{
151    let opt = Option::deserialize(deserializer)?;
152    match opt {
153        Some(v) => Ok(v),
154        None => Ok(false),
155    }
156}
157
158pub fn run_query(endpoint: &str, token: &str, to_run: &str) -> Result<HogQLQueryResult, Error> {
159    let client = reqwest::blocking::Client::new();
160
161    let request = QueryRequest {
162        query: Query::HogQLQuery {
163            query: to_run.to_string(),
164        },
165        refresh: Some(QueryRefresh::Blocking),
166    };
167
168    let response = client
169        .post(endpoint)
170        .json(&request)
171        .bearer_auth(token)
172        .send()?;
173
174    let code = response.status();
175    let body = response.text()?;
176
177    let value: Value = serde_json::from_str(&body)?;
178
179    if !code.is_success() {
180        let error: HogQLQueryErrorResponse = serde_json::from_value(value)?;
181        return Ok(Err(error));
182    }
183
184    // NOTE: We don't do any pagination here, because the HogQLQuery runner doesn't support it
185    let response: HogQLQueryResponse = serde_json::from_value(value)?;
186    Ok(Ok(response))
187}
188
189pub fn check_query(endpoint: &str, token: &str, to_run: &str) -> Result<MetadataResponse, Error> {
190    let client = reqwest::blocking::Client::new();
191
192    let query = MetadataQuery {
193        language: MetadataLanguage::HogQL,
194        query: to_run.to_string(),
195        source: None, // TODO - allow for this to be set? Idk if it matters much
196    };
197
198    let query = Query::HogQLMetadata(query);
199
200    let request = QueryRequest {
201        query,
202        refresh: None,
203    };
204
205    let response = client
206        .post(endpoint)
207        .json(&request)
208        .bearer_auth(token)
209        .send()?;
210
211    let code = response.status();
212    let body = response.text()?;
213
214    let value: Value = serde_json::from_str(&body)?;
215
216    if !code.is_success() {
217        let error: MetadataResponse = serde_json::from_value(value)?;
218        return Ok(error);
219    }
220
221    let response: MetadataResponse = serde_json::from_value(value)?;
222
223    Ok(response)
224}
225
226impl std::error::Error for HogQLQueryErrorResponse {}
227
228impl std::fmt::Display for HogQLQueryErrorResponse {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        write!(f, "{} ({}): {}", self.error_type, self.code, self.detail)
231    }
232}