Skip to main content

helix_db/
lib.rs

1//! # helix-db Rust SDK
2//!
3//! Crate root. The query-builder DSL lives in [`dsl`] and the query bundle /
4//! code-generation support lives in [`query_generator`].
5//!
6//! Most application code only needs the curated builder API:
7//! ```
8//! use helix_db::dsl::prelude::*;
9//! ```
10
11pub mod dsl;
12pub mod query_generator;
13
14use std::marker::PhantomData;
15
16// Re-export the DSL surface (types, builders, `prelude`, etc.) at the crate
17// root. This is also what makes the `crate::*` paths used inside `dsl.rs` and
18// `query_generator.rs` resolve.
19pub use dsl::*;
20
21// Convenience re-export so `helix_db::prelude::*` is reachable directly, in
22// addition to the canonical `helix_db::dsl::prelude::*`.
23pub use dsl::prelude;
24
25use reqwest::{Client as ReqwestClient, StatusCode};
26use serde::{Deserialize, Serialize};
27use thiserror::Error;
28
29/// Async HTTP client for running queries against a Helix instance.
30///
31/// Reachable as `helix_db::Client`.
32#[derive(Debug, Clone)]
33pub struct Client {
34    client: ReqwestClient,
35    url: reqwest::Url,
36    api_key: Option<String>,
37}
38
39/// Backwards-compatible alias for [`Client`].
40pub type HelixDBClient = Client;
41
42#[derive(Debug, Error)]
43pub enum HelixError {
44    #[error("Error communicating with server: {0}")]
45    ReqwestError(#[from] reqwest::Error),
46    #[error("Got Error from server: {details}")]
47    RemoteError { details: String },
48    #[error("Error serializing data: {0}")]
49    SerializationError(#[from] sonic_rs::Error),
50    #[error("Invalid URL: {0}")]
51    InvalidURL(String),
52}
53
54impl Client {
55    pub fn new(url: Option<&str>) -> Result<Self, HelixError> {
56        // Resolve the base query endpoint up front. `send()` reuses this for
57        // dynamic queries and appends `/<name>` for stored queries.
58        let url = reqwest::Url::parse(url.unwrap_or("http://localhost:6969"))
59            .map_err(|e| HelixError::InvalidURL(e.to_string()))?
60            .join("/v1/query")
61            .map_err(|e| HelixError::InvalidURL(e.to_string()))?;
62        Ok(Self {
63            client: ReqwestClient::new(),
64            url,
65            api_key: None,
66        })
67    }
68
69    pub fn with_api_key(mut self, api_key: Option<&str>) -> Self {
70        self.api_key = api_key.map(|key| key.to_string());
71        self
72    }
73
74    pub fn query<R: for<'de> Deserialize<'de>>(&self) -> QueryBuilder<'_, '_, R> {
75        QueryBuilder::new(self)
76    }
77}
78
79pub struct QueryBuilder<'hlx, 'a, R> {
80    client: &'hlx HelixDBClient,
81    query_type: QueryType,
82    headers: [Option<(&'a str, &'a str)>; 4],
83    body: Option<Vec<u8>>,
84    _phantom: PhantomData<R>,
85}
86
87#[derive(Default)]
88pub(crate) enum QueryType {
89    Stored(String),
90    Dynamic(DynamicQueryRequest),
91    #[default]
92    Empty,
93}
94
95impl<'hlx, 'a, R> QueryBuilder<'hlx, 'a, R> {
96    pub fn new(client: &'hlx HelixDBClient) -> Self {
97        let mut headers = [None; 4];
98        headers[0] = Some(("Content-Type", "application/json"));
99        Self {
100            client,
101            query_type: QueryType::default(),
102            headers,
103            body: None,
104            _phantom: PhantomData,
105        }
106    }
107
108    pub fn writer_only(mut self) -> Self {
109        self.headers[1] = Some(("x-helix-require-writer", "true"));
110        self
111    }
112
113    #[must_use]
114    pub fn warm_only(mut self) -> Self {
115        self.headers[2] = Some(("x-helix-warm", "true"));
116        self
117    }
118
119    pub fn should_await_durability(mut self, should: bool) -> Self {
120        self.headers[3] = Some((
121            "x-helix-await-durable",
122            if should { "true" } else { "false" },
123        ));
124        self
125    }
126
127    pub fn body<T: Serialize + Sync>(mut self, data: &T) -> Result<Self, HelixError> {
128        self.body = Some(sonic_rs::to_vec(data)?);
129        Ok(self)
130    }
131
132    pub fn stored_query(mut self, query_name: String) -> QueryRequest<'hlx, 'a, R> {
133        self.query_type = QueryType::Stored(query_name);
134        QueryRequest { request: self }
135    }
136
137    pub fn dynamic_query(mut self, query: DynamicQueryRequest) -> QueryRequest<'hlx, 'a, R> {
138        self.query_type = QueryType::Dynamic(query);
139        QueryRequest { request: self }
140    }
141}
142
143pub struct QueryRequest<'hlx, 'a, R> {
144    request: QueryBuilder<'hlx, 'a, R>,
145}
146
147impl<'hlx, 'a, R: for<'de> Deserialize<'de>> QueryRequest<'hlx, 'a, R> {
148    pub async fn send(self) -> Result<R, HelixError> {
149        let query_request = self.request;
150        let (url, body) = match query_request.query_type {
151            QueryType::Dynamic(query) => ("/v1/query".to_string(), Some(sonic_rs::to_vec(&query)?)),
152            QueryType::Stored(name) => (format!("/v1/query/{name}"), query_request.body),
153            QueryType::Empty => unreachable!(
154                "send() is only reachable after stored_query() or dynamic_query() sets query_type"
155            ),
156        };
157        let url = query_request
158            .client
159            .url
160            .join(&url)
161            .map_err(|e| HelixError::InvalidURL(e.to_string()))?;
162
163        let mut request = query_request.client.client.post(url);
164
165        for (k, v) in query_request.headers.into_iter().flatten() {
166            request = request.header(k, v);
167        }
168        if let Some(ref api_key) = query_request.client.api_key {
169            request = request.bearer_auth(api_key);
170        }
171        if let Some(body) = body {
172            request = request.body(body);
173        }
174
175        let response = request.send().await?;
176
177        match response.status() {
178            StatusCode::OK => {
179                let bytes = response.bytes().await?;
180                sonic_rs::from_slice::<R>(&bytes).map_err(Into::into)
181            }
182            code => match response.text().await {
183                Ok(t) => Err(HelixError::RemoteError { details: t }),
184                Err(_) => match code.canonical_reason() {
185                    Some(r) => Err(HelixError::RemoteError {
186                        details: r.to_string(),
187                    }),
188                    None => Err(HelixError::RemoteError {
189                        details: format!("unkown error with code: {code}"),
190                    }),
191                },
192            },
193        }
194    }
195}
196
197extern crate self as helix_db;
198
199#[cfg(test)]
200mod tests {
201    use helix_db::dsl::prelude::*;
202    use std::collections::BTreeMap;
203
204    #[register]
205    fn query1(name: String) {
206        // helix_db query that returns a read query or write query
207        read_batch()
208            .var_as("user", g().n_where(SourcePredicate::eq("username", name)))
209            .var_as(
210                "friends",
211                g().n(NodeRef::var("user"))
212                    .out(Some("FOLLOWS"))
213                    .dedup()
214                    .limit(100),
215            )
216            .returning(["user", "friends"])
217    }
218
219    #[test]
220    fn query1_builds_dynamic_request() {
221        // Calling the registered fn with concrete args yields a DynamicQueryRequest directly.
222        let query = query1(String::from("alice"));
223
224        assert!(matches!(query.request_type, DynamicQueryRequestType::Read));
225        let params = query.parameters.expect("parameters present");
226        assert!(matches!(
227            params.get("name"),
228            Some(DynamicQueryValue::String(s)) if s == "alice"
229        ));
230    }
231
232    // ---- Group 1: every #[register] param type coerces correctly -----------
233
234    #[register]
235    fn q_bool(flag: bool) {
236        read_batch()
237            .var_as("v", g().n_where(SourcePredicate::eq("field", flag)))
238            .returning(["v"])
239    }
240    #[register]
241    fn q_i64(num: i64) {
242        read_batch()
243            .var_as("v", g().n_where(SourcePredicate::eq("field", num)))
244            .returning(["v"])
245    }
246    #[register]
247    fn q_f64(x: f64) {
248        read_batch()
249            .var_as("v", g().n_where(SourcePredicate::eq("field", x)))
250            .returning(["v"])
251    }
252    #[register]
253    fn q_f32(x: f32) {
254        read_batch()
255            .var_as("v", g().n_where(SourcePredicate::eq("field", x)))
256            .returning(["v"])
257    }
258    #[register]
259    fn q_datetime(ts: DateTime) {
260        read_batch()
261            .var_as("v", g().n_where(SourcePredicate::eq("field", ts)))
262            .returning(["v"])
263    }
264    #[register]
265    fn q_value(val: ParamValue) {
266        read_batch()
267            .var_as("v", g().n_where(SourcePredicate::eq("field", val)))
268            .returning(["v"])
269    }
270    #[register]
271    fn q_object(obj: ParamObject) {
272        read_batch()
273            .var_as("v", g().n_where(SourcePredicate::eq("field", obj)))
274            .returning(["v"])
275    }
276    #[register]
277    fn q_array(items: Vec<String>) {
278        read_batch()
279            .var_as("v", g().n_where(SourcePredicate::eq("field", items)))
280            .returning(["v"])
281    }
282    #[register]
283    fn q_map(map: BTreeMap<String, String>) {
284        read_batch()
285            .var_as("v", g().n_where(SourcePredicate::eq("field", map)))
286            .returning(["v"])
287    }
288    #[register]
289    #[allow(unused_variables)] // bytes coercion errors without reading the value (see test below)
290    fn q_bytes(blob: Vec<u8>) {
291        read_batch()
292            .var_as("v", g().n_where(SourcePredicate::eq("field", blob)))
293            .returning(["v"])
294    }
295
296    #[test]
297    fn param_types_coerce_correctly() {
298        // bool
299        let r = q_bool(true);
300        assert!(matches!(r.request_type, DynamicQueryRequestType::Read));
301        assert!(matches!(
302            r.parameters.as_ref().unwrap().get("flag"),
303            Some(DynamicQueryValue::Bool(true))
304        ));
305        assert!(matches!(
306            r.parameter_types.as_ref().unwrap().get("flag"),
307            Some(QueryParamType::Bool)
308        ));
309
310        // i64
311        let r = q_i64(7);
312        assert!(matches!(
313            r.parameters.as_ref().unwrap().get("num"),
314            Some(DynamicQueryValue::I64(7))
315        ));
316        assert!(matches!(
317            r.parameter_types.as_ref().unwrap().get("num"),
318            Some(QueryParamType::I64)
319        ));
320
321        // f64
322        let r = q_f64(1.5);
323        assert!(matches!(
324            r.parameters.as_ref().unwrap().get("x"),
325            Some(DynamicQueryValue::F64(v)) if *v == 1.5
326        ));
327        assert!(matches!(
328            r.parameter_types.as_ref().unwrap().get("x"),
329            Some(QueryParamType::F64)
330        ));
331
332        // f32
333        let r = q_f32(1.5f32);
334        assert!(matches!(
335            r.parameters.as_ref().unwrap().get("x"),
336            Some(DynamicQueryValue::F32(v)) if *v == 1.5f32
337        ));
338        assert!(matches!(
339            r.parameter_types.as_ref().unwrap().get("x"),
340            Some(QueryParamType::F32)
341        ));
342
343        // DateTime -> rfc3339 string
344        let r = q_datetime(DateTime::from_millis(0));
345        let expected = DateTime::from_millis(0).to_rfc3339().unwrap();
346        assert!(matches!(
347            r.parameters.as_ref().unwrap().get("ts"),
348            Some(DynamicQueryValue::String(s)) if *s == expected
349        ));
350        assert!(matches!(
351            r.parameter_types.as_ref().unwrap().get("ts"),
352            Some(QueryParamType::DateTime)
353        ));
354
355        // ParamValue (PropertyValue)
356        let r = q_value(PropertyValue::I64(5));
357        assert!(matches!(
358            r.parameters.as_ref().unwrap().get("val"),
359            Some(DynamicQueryValue::I64(5))
360        ));
361        assert!(matches!(
362            r.parameter_types.as_ref().unwrap().get("val"),
363            Some(QueryParamType::Value)
364        ));
365
366        // ParamObject (BTreeMap<String, PropertyValue>)
367        let mut obj = BTreeMap::new();
368        obj.insert("k".to_string(), PropertyValue::String("x".to_string()));
369        let r = q_object(obj);
370        assert!(matches!(
371            r.parameters.as_ref().unwrap().get("obj"),
372            Some(DynamicQueryValue::Object(_))
373        ));
374        assert!(matches!(
375            r.parameter_types.as_ref().unwrap().get("obj"),
376            Some(QueryParamType::Object)
377        ));
378
379        // Vec<String> -> Array(String)
380        let r = q_array(vec!["a".to_string(), "b".to_string()]);
381        match r.parameters.as_ref().unwrap().get("items") {
382            Some(DynamicQueryValue::Array(items)) => {
383                assert_eq!(items.len(), 2);
384                assert!(matches!(&items[0], DynamicQueryValue::String(s) if s == "a"));
385                assert!(matches!(&items[1], DynamicQueryValue::String(s) if s == "b"));
386            }
387            other => panic!("expected array, got {other:?}"),
388        }
389        assert!(matches!(
390            r.parameter_types.as_ref().unwrap().get("items"),
391            Some(QueryParamType::Array(inner)) if matches!(**inner, QueryParamType::String)
392        ));
393
394        // BTreeMap<String, String> -> Object
395        let mut map = BTreeMap::new();
396        map.insert("k".to_string(), "v".to_string());
397        let r = q_map(map);
398        assert!(matches!(
399            r.parameters.as_ref().unwrap().get("map"),
400            Some(DynamicQueryValue::Object(_))
401        ));
402        assert!(matches!(
403            r.parameter_types.as_ref().unwrap().get("map"),
404            Some(QueryParamType::Object)
405        ));
406    }
407
408    #[test]
409    #[should_panic(expected = "failed to coerce parameter")]
410    fn bytes_param_panics_on_dynamic_call() {
411        // Bytes params register fine for the stored query, but dynamic coercion is unsupported
412        // and the generated callable panics when invoked.
413        let _ = q_bytes(vec![1, 2, 3]);
414    }
415
416    // ---- Group 2: SourcePredicate JSON — old (literal) vs new (param) -------
417
418    #[test]
419    fn source_predicate_literal_json_is_unchanged() {
420        assert_eq!(
421            sonic_rs::to_string(&SourcePredicate::eq("username", "alice")).unwrap(),
422            r#"{"Eq":["username",{"String":"alice"}]}"#
423        );
424        assert_eq!(
425            sonic_rs::to_string(&SourcePredicate::gt("score", 10i64)).unwrap(),
426            r#"{"Gt":["score",{"I64":10}]}"#
427        );
428        assert_eq!(
429            sonic_rs::to_string(&SourcePredicate::between("age", 18i64, 65i64)).unwrap(),
430            r#"{"Between":["age",{"I64":18},{"I64":65}]}"#
431        );
432    }
433
434    #[test]
435    fn source_predicate_param_json_uses_expr_variants() {
436        assert_eq!(
437            sonic_rs::to_string(&SourcePredicate::eq("username", Expr::param("name"))).unwrap(),
438            r#"{"EqExpr":["username",{"Param":"name"}]}"#
439        );
440        assert_eq!(
441            sonic_rs::to_string(&SourcePredicate::lte("score", Expr::param("max"))).unwrap(),
442            r#"{"LteExpr":["score",{"Param":"max"}]}"#
443        );
444        assert_eq!(
445            sonic_rs::to_string(&SourcePredicate::between("age", Expr::param("lo"), 65i64))
446                .unwrap(),
447            r#"{"BetweenExpr":["age",{"Param":"lo"},{"Constant":{"I64":65}}]}"#
448        );
449    }
450
451    #[test]
452    fn source_predicate_json_round_trips() {
453        for sp in [
454            SourcePredicate::eq("username", "alice"),
455            SourcePredicate::eq("username", Expr::param("name")),
456            SourcePredicate::between("age", Expr::param("lo"), 65i64),
457        ] {
458            let json = sonic_rs::to_string(&sp).unwrap();
459            let back: SourcePredicate = sonic_rs::from_str(&json).unwrap();
460            assert_eq!(sp, back);
461        }
462    }
463
464    // ---- Group 3: full query AST, literal vs param (self-contained) --------
465
466    #[test]
467    fn query_ast_literal_vs_param_json() {
468        let literal = read_batch()
469            .var_as(
470                "user",
471                g().n_where(SourcePredicate::eq("username", "alice")),
472            )
473            .returning(["user"]);
474        let literal_json = sonic_rs::to_string(&literal).unwrap();
475        assert!(
476            literal_json.contains(r#"{"NWhere":{"Eq":["username",{"String":"alice"}]}}"#),
477            "literal NWhere step changed shape: {literal_json}"
478        );
479        assert!(!literal_json.contains("EqExpr"));
480
481        let param = read_batch()
482            .var_as(
483                "user",
484                g().n_where(SourcePredicate::eq("username", Expr::param("name"))),
485            )
486            .returning(["user"]);
487        let param_json = sonic_rs::to_string(&param).unwrap();
488        assert!(
489            param_json.contains(r#"{"NWhere":{"EqExpr":["username",{"Param":"name"}]}}"#),
490            "param NWhere step missing EqExpr/Param: {param_json}"
491        );
492    }
493}
494
495#[cfg(test)]
496mod client_tests {
497    //! Tests for the `Client` / `QueryBuilder` request-building surface. These
498    //! exercise everything up to (but not including) the network round-trip, so
499    //! they need no running Helix instance. As a child module of the crate root
500    //! they can read the builder's private fields directly.
501    use super::*;
502    use serde::Deserialize;
503
504    #[derive(Deserialize)]
505    struct Resp;
506
507    fn sample_request() -> DynamicQueryRequest {
508        DynamicQueryRequest::read(
509            read_batch()
510                .var_as(
511                    "user",
512                    g().n_where(SourcePredicate::eq("username", "alice")),
513                )
514                .returning(["user"]),
515        )
516    }
517
518    // ---- Client construction ------------------------------------------------
519
520    #[test]
521    fn new_defaults_to_localhost() {
522        let client = Client::new(None).unwrap();
523        assert_eq!(client.url.as_str(), "http://localhost:6969/v1/query");
524        assert!(client.api_key.is_none());
525    }
526
527    #[test]
528    fn new_parses_custom_url() {
529        let client = Client::new(Some("https://cluster.helix-db.com")).unwrap();
530        assert_eq!(client.url.as_str(), "https://cluster.helix-db.com/v1/query");
531    }
532
533    #[test]
534    fn new_rejects_invalid_url() {
535        let err = Client::new(Some("not a url")).unwrap_err();
536        assert!(matches!(err, HelixError::InvalidURL(_)));
537    }
538
539    #[test]
540    fn with_api_key_sets_and_clears() {
541        let client = Client::new(None).unwrap().with_api_key(Some("hx_secret"));
542        assert_eq!(client.api_key.as_deref(), Some("hx_secret"));
543
544        let cleared = client.with_api_key(None);
545        assert!(cleared.api_key.is_none());
546    }
547
548    // ---- Header assembly ----------------------------------------------------
549
550    #[test]
551    fn query_builder_starts_with_only_content_type() {
552        let client = Client::new(None).unwrap();
553        let builder = client.query::<Resp>();
554        assert_eq!(
555            builder.headers[0],
556            Some(("Content-Type", "application/json"))
557        );
558        assert!(builder.headers[1..].iter().all(Option::is_none));
559    }
560
561    #[test]
562    fn header_toggles_populate_slots() {
563        let client = Client::new(None).unwrap();
564        let builder = client
565            .query::<Resp>()
566            .writer_only()
567            .warm_only()
568            .should_await_durability(true);
569        assert_eq!(builder.headers[1], Some(("x-helix-require-writer", "true")));
570        assert_eq!(builder.headers[2], Some(("x-helix-warm", "true")));
571        assert_eq!(builder.headers[3], Some(("x-helix-await-durable", "true")));
572    }
573
574    #[test]
575    fn should_await_durability_false_sends_false() {
576        let client = Client::new(None).unwrap();
577        let builder = client.query::<Resp>().should_await_durability(false);
578        assert_eq!(builder.headers[3], Some(("x-helix-await-durable", "false")));
579    }
580
581    // ---- Query type + body --------------------------------------------------
582
583    #[test]
584    fn dynamic_query_sets_query_type() {
585        let client = Client::new(None).unwrap();
586        let request = client.query::<Resp>().dynamic_query(sample_request());
587        assert!(matches!(request.request.query_type, QueryType::Dynamic(_)));
588    }
589
590    #[test]
591    fn stored_query_sets_query_type() {
592        let client = Client::new(None).unwrap();
593        let request = client.query::<Resp>().stored_query("add_user".to_string());
594        assert!(
595            matches!(&request.request.query_type, QueryType::Stored(name) if name == "add_user")
596        );
597    }
598
599    #[derive(serde::Serialize)]
600    struct Payload {
601        name: String,
602    }
603
604    #[test]
605    fn body_serializes_payload() {
606        let client = Client::new(None).unwrap();
607        let payload = Payload {
608            name: "alice".to_string(),
609        };
610        let builder = client.query::<Resp>().body(&payload).unwrap();
611        assert_eq!(builder.body, Some(sonic_rs::to_vec(&payload).unwrap()));
612    }
613
614    // ---- Request routing (exercises the real `send()` path) -----------------
615
616    #[derive(serde::Deserialize)]
617    struct EmptyResp {}
618
619    /// Spawn a one-shot HTTP server on a random port. Returns its base URL and a
620    /// handle that resolves to the request-target (path) of the first request.
621    async fn spawn_capture_server() -> (String, tokio::task::JoinHandle<String>) {
622        use tokio::io::{AsyncReadExt, AsyncWriteExt};
623        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
624        let base = format!("http://{}", listener.local_addr().unwrap());
625        let handle = tokio::spawn(async move {
626            let (mut socket, _) = listener.accept().await.unwrap();
627            let mut buf = [0u8; 4096];
628            let n = socket.read(&mut buf).await.unwrap();
629            let request_line = String::from_utf8_lossy(&buf[..n])
630                .lines()
631                .next()
632                .unwrap()
633                .to_string();
634            // `METHOD <target> HTTP/1.1` -> the target.
635            let target = request_line.split_whitespace().nth(1).unwrap().to_string();
636            let resp = "HTTP/1.1 200 OK\r\nContent-Length: 2\r\nConnection: close\r\n\r\n{}";
637            socket.write_all(resp.as_bytes()).await.unwrap();
638            target
639        });
640        (base, handle)
641    }
642
643    #[tokio::test]
644    async fn dynamic_query_posts_to_v1_query() {
645        let (base, handle) = spawn_capture_server().await;
646        let client = Client::new(Some(&base)).unwrap();
647        let _: EmptyResp = client
648            .query()
649            .dynamic_query(sample_request())
650            .send()
651            .await
652            .unwrap();
653        assert_eq!(handle.await.unwrap(), "/v1/query");
654    }
655
656    #[tokio::test]
657    async fn stored_query_posts_to_named_route() {
658        let (base, handle) = spawn_capture_server().await;
659        let client = Client::new(Some(&base)).unwrap();
660        let _: EmptyResp = client
661            .query()
662            .stored_query("add_user".to_string())
663            .send()
664            .await
665            .unwrap();
666        assert_eq!(handle.await.unwrap(), "/v1/query/add_user");
667    }
668}