posthog_cli/experimental/query/
mod.rs

1use std::collections::HashMap;
2
3use anyhow::Error;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7pub mod command;
8
9// TODO - we could formalise a lot of this and move it into posthog-rs, tbh
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct QueryRequest {
13    pub query: Query,
14    #[serde(skip_serializing_if = "Option::is_none")]
15    pub refresh: Option<QueryRefresh>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19#[serde(tag = "kind")]
20pub enum Query {
21    HogQLQuery { query: String },
22    HogQLMetadata(MetadataQuery),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(rename_all = "snake_case")]
27pub enum QueryRefresh {
28    Blocking,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MetadataQuery {
33    pub language: MetadataLanguage,
34    pub query: String,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub source: Option<Box<Query>>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum MetadataLanguage {
41    #[serde(rename = "hogQL")]
42    HogQL,
43}
44
45pub type HogQLQueryResult = Result<HogQLQueryResponse, HogQLQueryErrorResponse>;
46
47#[derive(Debug, Clone, Deserialize, Serialize)]
48pub struct HogQLQueryResponse {
49    pub cache_key: Option<String>,
50    pub cache_target_age: Option<String>,
51    pub clickhouse: Option<String>, // Clickhouse query text
52    #[serde(default, deserialize_with = "null_is_empty")]
53    pub columns: Vec<String>, // Columns returned from the query
54    pub error: Option<String>,
55    #[serde(default, deserialize_with = "null_is_empty")]
56    pub explain: Vec<String>,
57    #[serde(default, rename = "hasMore", deserialize_with = "null_is_false")]
58    pub has_more: bool,
59    pub hogql: Option<String>, // HogQL query text
60    #[serde(default, deserialize_with = "null_is_false")]
61    pub is_cached: bool,
62    pub last_refresh: Option<String>, // Last time the query was refreshed
63    pub next_allowed_client_refresh_time: Option<String>, // Next time the client can refresh the query
64    pub offset: Option<i64>,                              // Offset of the response rows
65    pub limit: Option<i64>,                               // Limit of the query
66    pub query: Option<String>,                            // Query text
67    #[serde(default, deserialize_with = "null_is_empty")]
68    pub types: Vec<(String, String)>,
69    #[serde(default, deserialize_with = "null_is_empty")]
70    pub results: Vec<Vec<Value>>,
71    #[serde(default, deserialize_with = "null_is_empty")]
72    pub timings: Vec<Timing>,
73    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
74    pub other: HashMap<String, Value>,
75}
76
77#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct HogQLQueryErrorResponse {
79    pub code: String,
80    pub detail: String,
81    #[serde(rename = "type")]
82    pub error_type: String,
83    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
84    pub other: HashMap<String, Value>,
85}
86
87#[derive(Debug, Clone, Deserialize, Serialize)]
88pub struct Timing {
89    pub k: String,
90    pub t: f64,
91}
92
93#[derive(Debug, Clone, Deserialize, Serialize)]
94pub struct MetadataResponse {
95    #[serde(default, deserialize_with = "null_is_empty")]
96    pub errors: Vec<Notice>,
97    #[serde(default, deserialize_with = "null_is_empty")]
98    pub notices: Vec<Notice>,
99    #[serde(default, deserialize_with = "null_is_empty")]
100    pub warnings: Vec<Notice>,
101    #[serde(default, rename = "isUsingIndices")]
102    pub is_using_indices: Option<IndicesUsage>,
103    #[serde(default, deserialize_with = "null_is_false", rename = "isValid")]
104    pub is_valid: bool,
105    #[serde(default, deserialize_with = "null_is_false")]
106    pub is_valid_view: bool,
107    #[serde(default, deserialize_with = "null_is_empty")]
108    pub table_names: Vec<String>,
109    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
110    pub other: HashMap<String, Value>,
111}
112
113#[derive(Debug, Clone, Deserialize, Serialize)]
114#[serde(rename_all = "lowercase")]
115pub enum IndicesUsage {
116    Undecisive,
117    No,
118    Partial,
119    Yes,
120}
121
122#[derive(Debug, Clone, Deserialize, Serialize)]
123pub struct Notice {
124    pub message: String,
125    #[serde(flatten)]
126    pub span: Option<NoticeSpan>,
127    #[serde(flatten, skip_serializing_if = "HashMap::is_empty")]
128    pub other: HashMap<String, Value>,
129}
130
131#[derive(Debug, Clone, Deserialize, Serialize)]
132pub struct NoticeSpan {
133    pub start: usize,
134    pub end: usize,
135}
136
137fn null_is_empty<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
138where
139    D: serde::Deserializer<'de>,
140    T: serde::Deserialize<'de>,
141{
142    let opt = Option::deserialize(deserializer)?;
143    match opt {
144        Some(v) => Ok(v),
145        None => Ok(Vec::new()),
146    }
147}
148
149fn null_is_false<'de, D>(deserializer: D) -> Result<bool, D::Error>
150where
151    D: serde::Deserializer<'de>,
152{
153    let opt = Option::deserialize(deserializer)?;
154    match opt {
155        Some(v) => Ok(v),
156        None => Ok(false),
157    }
158}
159
160pub fn run_query(endpoint: &str, token: &str, to_run: &str) -> Result<HogQLQueryResult, Error> {
161    let client = reqwest::blocking::Client::new();
162
163    let request = QueryRequest {
164        query: Query::HogQLQuery {
165            query: to_run.to_string(),
166        },
167        refresh: Some(QueryRefresh::Blocking),
168    };
169
170    let response = client
171        .post(endpoint)
172        .json(&request)
173        .bearer_auth(token)
174        .send()?;
175
176    let code = response.status();
177    let body = response.text()?;
178
179    let value: Value = serde_json::from_str(&body)?;
180
181    if !code.is_success() {
182        let error: HogQLQueryErrorResponse = serde_json::from_value(value)?;
183        return Ok(Err(error));
184    }
185
186    // NOTE: We don't do any pagination here, because the HogQLQuery runner doesn't support it
187    let response: HogQLQueryResponse = serde_json::from_value(value)?;
188    Ok(Ok(response))
189}
190
191pub fn check_query(endpoint: &str, token: &str, to_run: &str) -> Result<MetadataResponse, Error> {
192    let client = reqwest::blocking::Client::new();
193
194    let query = MetadataQuery {
195        language: MetadataLanguage::HogQL,
196        query: to_run.to_string(),
197        source: None, // TODO - allow for this to be set? Idk if it matters much
198    };
199
200    let query = Query::HogQLMetadata(query);
201
202    let request = QueryRequest {
203        query,
204        refresh: None,
205    };
206
207    let response = client
208        .post(endpoint)
209        .json(&request)
210        .bearer_auth(token)
211        .send()?;
212
213    let code = response.status();
214    let body = response.text()?;
215
216    let value: Value = serde_json::from_str(&body)?;
217
218    if !code.is_success() {
219        let error: MetadataResponse = serde_json::from_value(value)?;
220        return Ok(error);
221    }
222
223    let response: MetadataResponse = serde_json::from_value(value)?;
224
225    Ok(response)
226}
227
228impl std::error::Error for HogQLQueryErrorResponse {}
229
230impl std::fmt::Display for HogQLQueryErrorResponse {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        write!(f, "{} ({}): {}", self.error_type, self.code, self.detail)
233    }
234}