Skip to main content

datapress_client/
blocking.rs

1//! Synchronous, blocking wrapper around the async [`crate::Client`].
2//!
3//! Backed by a private current-thread Tokio runtime. Enable with the
4//! `blocking` feature. Convenient for scripts, tests, and the Python
5//! bindings, which want a plain call-and-wait API.
6
7use crate::error::Result;
8use crate::models::{Predicate, QueryRequest, QueryResponse, SqlResponse};
9use serde_json::Value as JsonValue;
10
11/// Blocking DataPress client.
12///
13/// Each instance owns a single-threaded Tokio runtime used to drive the
14/// async client to completion. Not `Clone` (the runtime is not shared);
15/// create one per thread.
16pub struct Client {
17    inner: crate::Client,
18    rt: tokio::runtime::Runtime,
19}
20
21impl Client {
22    /// Construct a blocking client with defaults for `base_url`.
23    pub fn new(base_url: impl Into<String>) -> Result<Self> {
24        Self::from_async(crate::Client::new(base_url)?)
25    }
26
27    /// Wrap an already-built async [`crate::Client`].
28    pub fn from_async(inner: crate::Client) -> Result<Self> {
29        let rt = tokio::runtime::Builder::new_current_thread()
30            .enable_all()
31            .build()
32            .map_err(|e| crate::ClientError::Decode(format!("runtime build failed: {e}")))?;
33        Ok(Self { inner, rt })
34    }
35
36    /// Start a [`crate::ClientBuilder`]; pass the result to
37    /// [`Client::from_async`] after `.build()`.
38    pub fn builder(base_url: impl Into<String>) -> crate::ClientBuilder {
39        crate::ClientBuilder::new(base_url)
40    }
41
42    /// Liveness probe.
43    pub fn healthz(&self) -> Result<JsonValue> {
44        self.rt.block_on(self.inner.healthz())
45    }
46
47    /// Readiness probe.
48    pub fn readyz(&self) -> Result<JsonValue> {
49        self.rt.block_on(self.inner.readyz())
50    }
51
52    /// List registered dataset names.
53    pub fn datasets(&self) -> Result<Vec<String>> {
54        self.rt.block_on(self.inner.datasets())
55    }
56
57    /// Fetch the schema description for `dataset`.
58    pub fn schema(&self, dataset: &str) -> Result<JsonValue> {
59        self.rt.block_on(self.inner.schema(dataset))
60    }
61
62    /// Count matching rows.
63    pub fn count(&self, dataset: &str, predicates: &[Predicate]) -> Result<u64> {
64        self.rt.block_on(self.inner.count(dataset, predicates))
65    }
66
67    /// Run a structured query, returning the JSON envelope.
68    pub fn query_json(&self, dataset: &str, request: &QueryRequest) -> Result<QueryResponse> {
69        self.rt.block_on(self.inner.query_json(dataset, request))
70    }
71
72    /// Run a raw read-only SQL statement.
73    pub fn sql(&self, sql: impl Into<String>, max_rows: Option<u64>) -> Result<SqlResponse> {
74        self.rt.block_on(self.inner.sql(sql, max_rows))
75    }
76
77    /// Trigger an in-place reload of `dataset`.
78    pub fn reload(&self, dataset: &str) -> Result<JsonValue> {
79        self.rt.block_on(self.inner.reload(dataset))
80    }
81
82    /// Run a structured query asking for Arrow IPC, returning the raw
83    /// stream bytes.
84    pub fn query_arrow_bytes(
85        &self,
86        dataset: &str,
87        request: &QueryRequest,
88    ) -> Result<bytes::Bytes> {
89        self.rt
90            .block_on(self.inner.query_arrow_bytes(dataset, request))
91    }
92
93    /// Run a structured query and decode the Arrow IPC response into
94    /// record batches.
95    #[cfg(feature = "arrow")]
96    pub fn query_arrow(
97        &self,
98        dataset: &str,
99        request: &QueryRequest,
100    ) -> Result<Vec<arrow::record_batch::RecordBatch>> {
101        self.rt.block_on(self.inner.query_arrow(dataset, request))
102    }
103
104    /// Access the underlying async client (shares the same HTTP pool).
105    pub fn inner(&self) -> &crate::Client {
106        &self.inner
107    }
108}