hyperdb-api 0.1.0

Pure Rust API for Hyper database
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 OR MIT

//! gRPC-based connection to Hyper databases.
//!
//! This module provides [`GrpcConnection`] for query-only access to Hyper databases
//! via gRPC, with results returned in Apache Arrow IPC format.
//!
//! # Differences from TCP Connection
//!
//! gRPC connections have these limitations compared to TCP:
//!
//! - **Read-only**: Only SELECT queries are supported
//! - **No DDL/DML**: CREATE, DROP, INSERT, UPDATE, DELETE not supported
//! - **No COPY**: Bulk insertion via COPY protocol not available
//! - **No prepared statements**: Parameters not supported
//!
//! Use [`Connection`](crate::Connection) for full read-write access.
//!
//! # Transfer Modes
//!
//! gRPC queries support three transfer modes via
//! [`TransferMode`](crate::grpc::TransferMode), which control how
//! result data flows between server and client:
//!
//! - **`Sync`** — All results are returned in a single response message.
//!   Simple but limited to a 100-second server timeout, making it unsuitable
//!   for long-running queries or large result sets.
//!
//! - **`Async`** — The initial response contains only a header (query ID);
//!   results must be fetched separately via `GetQueryResult`. Useful when
//!   the client needs to decouple query submission from result consumption.
//!
//! - **`Adaptive`** (default, recommended) — The first chunk of results is
//!   returned inline with the response; remaining chunks are streamed via
//!   `GetQueryResult`. This avoids the sync timeout limit while keeping
//!   latency low for small results that fit in a single chunk.
//!
//! # Example
//!
//! ```no_run
//! use hyperdb_api::grpc::{GrpcConnection, GrpcConnectionAsync};
//!
//! // Async usage
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//!     let mut conn = GrpcConnectionAsync::connect(
//!         "http://localhost:7484",
//!         "my_database.hyper"
//!     ).await?;
//!
//!     // Execute a query
//!     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users").await?;
//!
//!     // Or get a structured result
//!     let result = conn.execute_query("SELECT id, name FROM users").await?;
//!     println!("Got {} bytes of Arrow data", result.arrow_data().len());
//!
//!     Ok(())
//! }
//!
//! // Sync usage
//! fn main_sync() -> Result<(), Box<dyn std::error::Error>> {
//!     let mut conn = GrpcConnection::connect(
//!         "http://localhost:7484",
//!         "my_database.hyper"
//!     )?;
//!
//!     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users")?;
//!     Ok(())
//! }
//! ```

use crate::error::Result;

// Re-export types from hyperdb_api_core::client::grpc for convenience
pub(crate) use hyperdb_api_core::client::grpc::{
    GrpcClient, GrpcClientSync, GrpcConfig, GrpcQueryResult,
};

/// A gRPC connection to a Hyper database (query-only).
///
/// Unlike [`Connection`](crate::Connection), this provides read-only access via gRPC
/// with results in Apache Arrow IPC format. This is useful for:
///
/// - Load-balanced deployments where gRPC provides better routing
/// - Integration with Arrow-based data pipelines
/// - Scenarios where HTTP/2 benefits are needed
///
/// # Limitations
///
/// gRPC connections only support SELECT queries. Attempting to execute
/// DDL (CREATE, DROP), DML (INSERT, UPDATE, DELETE), or use features like
/// prepared statements will result in an error.
///
/// # Async vs Sync
///
/// This struct provides both async and sync APIs:
///
/// - `connect()` / `execute_query()` - blocking (uses internal tokio runtime)
/// - `connect_async()` / `execute_query_async()` - async (requires tokio runtime)
///
/// For applications already using tokio, the async methods are preferred.
#[derive(Debug)]
pub struct GrpcConnection {
    /// The underlying gRPC client (sync wrapper)
    client: GrpcClientSync,
    /// Database path
    database: Option<String>,
}

impl GrpcConnection {
    /// Connects to a Hyper server via gRPC (blocking).
    ///
    /// # Arguments
    ///
    /// * `endpoint` - The gRPC endpoint URL (e.g., "<http://localhost:7484>")
    /// * `database_path` - Path to the database to query
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use hyperdb_api::grpc::GrpcConnection;
    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let conn = GrpcConnection::connect(
    ///     "http://localhost:7484",
    ///     "my_database.hyper"
    /// )?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] wrapping a
    /// `hyperdb_api_core::client::Error` if the HTTP/2 channel cannot be established
    /// (endpoint unreachable, TLS handshake failure, auth rejection).
    pub fn connect(endpoint: &str, database_path: &str) -> Result<Self> {
        let config = GrpcConfig::new(endpoint).database(database_path);
        let client = GrpcClientSync::connect(config)?;

        Ok(GrpcConnection {
            client,
            database: Some(database_path.to_string()),
        })
    }

    /// Connects to a Hyper server via gRPC with custom configuration (blocking).
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use hyperdb_api::grpc::{GrpcConnection, GrpcConfig};
    /// # use std::time::Duration;
    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let config = GrpcConfig::new("http://localhost:7484")
    ///     .database("my_database.hyper")
    ///     .connect_timeout(Duration::from_secs(10))
    ///     .header("x-custom-header", "value");
    ///
    /// let conn = GrpcConnection::connect_with_config(config)?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
    /// established with the supplied configuration.
    pub fn connect_with_config(config: GrpcConfig) -> Result<Self> {
        let database = config.database_path().map(std::string::ToString::to_string);
        let client = GrpcClientSync::connect(config)?;

        Ok(GrpcConnection { client, database })
    }

    /// Executes a SQL query and returns raw Arrow IPC bytes (blocking).
    ///
    /// This is the most efficient method when you need the raw Arrow data
    /// for processing with an Arrow library.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use hyperdb_api::grpc::GrpcConnection;
    /// # fn example(conn: &mut GrpcConnection) -> Result<(), Box<dyn std::error::Error>> {
    /// let arrow_bytes = conn.execute_query_to_arrow("SELECT * FROM users")?;
    ///
    /// // Process with arrow crate using the zero-copy helper
    /// let rowset = hyperdb_api::ArrowRowset::from_bytes(arrow_bytes)?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the gRPC server rejects the query
    /// or if the HTTP/2 channel fails mid-stream.
    pub fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
        Ok(self.client.execute_query_to_arrow(sql)?)
    }

    /// Executes a SQL query and returns a structured result (blocking).
    ///
    /// The result contains the Arrow IPC data along with metadata about
    /// the query execution.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use hyperdb_api::grpc::GrpcConnection;
    /// # fn example(conn: &mut GrpcConnection) -> Result<(), Box<dyn std::error::Error>> {
    /// let result = conn.execute_query("SELECT id, name FROM users")?;
    /// println!("Query ID: {:?}", result.query_id());
    /// println!("Columns: {}", result.column_count());
    /// let arrow_data = result.into_arrow_data();
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the gRPC server rejects the query
    /// or the HTTP/2 channel fails.
    pub fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
        Ok(self.client.execute_query(sql)?)
    }

    /// Cancels an in-flight gRPC query by its `query_id` (blocking).
    ///
    /// This is the gRPC analogue of PG wire's `CancelRequest`. Unlike PG
    /// wire — where the cancel opens a separate TCP connection — the
    /// gRPC cancel rides the existing HTTP/2 channel as a regular RPC,
    /// so it reuses this connection's channel, database routing, and
    /// custom headers.
    ///
    /// # When you have a `query_id`
    ///
    /// The server assigns a `query_id` for ASYNC-mode queries
    /// (long-running queries that the client polls).  Read it from
    /// [`GrpcQueryResult::query_id`] after an async-mode execution.
    /// SYNC-mode queries typically complete before a cancel would be
    /// useful — drop the in-flight future instead.
    ///
    /// Cancellation is best-effort: a successful `Ok(())` return means
    /// the server acknowledged the cancel, not that the query had not
    /// already finished. Errors indicate a transport-level failure
    /// (channel closed, network error, auth expired) — useful for
    /// metrics, retry logic, or "cancel failed" UX.
    ///
    /// # Fallible by design
    ///
    /// The `Result<()>` return is the **explicit user-facing cancel
    /// API** and is distinct from the
    /// [`Cancellable`](hyperdb_api_core::client::Cancellable) trait, which requires
    /// an infallible `cancel(&self)` method with no arguments. A
    /// `GrpcConnection` cannot implement `Cancellable` directly: the
    /// trait's signature has nowhere to pass a `query_id`, and gRPC
    /// connections can carry many concurrent queries (so there is no
    /// unambiguous "the" query to cancel the way there is on a PG wire
    /// connection). If you need `Cancellable`-style fire-and-forget
    /// cancel for a future gRPC streaming result type, it will live
    /// on a per-query handle that wraps this method and swallows
    /// errors — mirroring
    /// [`impl Cancellable for hyperdb_api_core::client::Client`](hyperdb_api_core::client::Cancellable).
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use hyperdb_api::grpc::GrpcConnection;
    /// # fn example(conn: &mut GrpcConnection, query_id: &str) -> hyperdb_api::Result<()> {
    /// conn.cancel_query(query_id)?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] on transport-level failures (channel
    /// closed, network error, auth expired). An unknown or already-finished
    /// `query_id` is not an error — the server returns `Ok`.
    pub fn cancel_query(&mut self, query_id: &str) -> Result<()> {
        Ok(self.client.cancel_query(query_id)?)
    }

    /// Returns the database path, if one is attached.
    #[must_use]
    pub fn database(&self) -> Option<&str> {
        self.database.as_deref()
    }

    /// Returns the gRPC configuration.
    pub fn config(&self) -> &GrpcConfig {
        self.client.config()
    }

    /// Closes the connection (blocking).
    ///
    /// Note: The connection is automatically closed when dropped.
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the underlying HTTP/2 channel cannot
    /// be shut down cleanly.
    pub fn close(self) -> Result<()> {
        Ok(self.client.close()?)
    }
}

/// Async gRPC connection to a Hyper database (query-only).
///
/// This is the async version of [`GrpcConnection`], designed for use with
/// tokio-based async applications.
///
/// # Example
///
/// ```no_run
/// # use hyperdb_api::grpc::GrpcConnectionAsync;
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
///     let mut conn = GrpcConnectionAsync::connect(
///         "http://localhost:7484",
///         "my_database.hyper"
///     ).await?;
///
///     let arrow_data = conn.execute_query_to_arrow("SELECT * FROM users").await?;
///     Ok(())
/// }
/// ```
#[derive(Debug)]
pub struct GrpcConnectionAsync {
    /// The underlying async gRPC client
    client: GrpcClient,
    /// Database path
    database: Option<String>,
}

impl GrpcConnectionAsync {
    /// Connects to a Hyper server via gRPC (async).
    ///
    /// # Arguments
    ///
    /// * `endpoint` - The gRPC endpoint URL (e.g., "<http://localhost:7484>")
    /// * `database_path` - Path to the database to query
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
    /// established (endpoint unreachable, TLS handshake failure).
    pub async fn connect(endpoint: &str, database_path: &str) -> Result<Self> {
        let config = GrpcConfig::new(endpoint).database(database_path);
        let client = GrpcClient::connect(config).await?;

        Ok(GrpcConnectionAsync {
            client,
            database: Some(database_path.to_string()),
        })
    }

    /// Connects to a Hyper server via gRPC with custom configuration (async).
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the HTTP/2 channel cannot be
    /// established with the supplied configuration.
    pub async fn connect_with_config(config: GrpcConfig) -> Result<Self> {
        let database = config.database_path().map(std::string::ToString::to_string);
        let client = GrpcClient::connect(config).await?;

        Ok(GrpcConnectionAsync { client, database })
    }

    /// Executes a SQL query and returns raw Arrow IPC bytes (async).
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the server rejects the query or the
    /// HTTP/2 channel fails mid-stream.
    pub async fn execute_query_to_arrow(&mut self, sql: &str) -> Result<bytes::Bytes> {
        Ok(self.client.execute_query_to_arrow(sql).await?)
    }

    /// Executes a SQL query and returns a structured result (async).
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the server rejects the query or the
    /// HTTP/2 channel fails.
    pub async fn execute_query(&mut self, sql: &str) -> Result<GrpcQueryResult> {
        Ok(self.client.execute_query(sql).await?)
    }

    /// Cancels an in-flight gRPC query by its `query_id` (async).
    ///
    /// See [`GrpcConnection::cancel_query`] for full semantics, including
    /// the "Fallible by design" discussion of why this returns
    /// `Result<()>` and why it is *not* an implementation of the
    /// [`Cancellable`](hyperdb_api_core::client::Cancellable) trait. The async
    /// variant avoids blocking the current thread; both variants route
    /// the cancel over the same channel used for queries, carrying this
    /// connection's database routing and custom headers.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # use hyperdb_api::grpc::GrpcConnectionAsync;
    /// # async fn example(conn: &mut GrpcConnectionAsync, query_id: &str)
    /// #     -> hyperdb_api::Result<()> {
    /// conn.cancel_query(query_id).await?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    ///
    /// See [`GrpcConnection::cancel_query`].
    pub async fn cancel_query(&mut self, query_id: &str) -> Result<()> {
        Ok(self.client.cancel_query(query_id).await?)
    }

    /// Returns the database path, if one is attached.
    #[must_use]
    pub fn database(&self) -> Option<&str> {
        self.database.as_deref()
    }

    /// Returns the gRPC configuration.
    pub fn config(&self) -> &GrpcConfig {
        self.client.config()
    }

    /// Closes the connection (async).
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::Client`] if the underlying HTTP/2 channel cannot
    /// be shut down cleanly.
    pub async fn close(self) -> Result<()> {
        Ok(self.client.close().await?)
    }
}

// ============================================================================
// Streaming chunk source adapter
// ============================================================================

/// Adapter from a [`hyperdb_api_core::client::grpc::GrpcChunkStreamSync`] to
/// [`crate::arrow_result::ChunkSource`].
///
/// Used internally by [`Connection::execute_query`](crate::Connection::execute_query)
/// on a gRPC transport so the high-level `Rowset` decodes record batches
/// lazily instead of buffering the whole Arrow IPC payload in memory.
pub(crate) struct GrpcChunkStreamSource {
    inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync,
}

impl GrpcChunkStreamSource {
    pub(crate) fn new(inner: hyperdb_api_core::client::grpc::GrpcChunkStreamSync) -> Self {
        GrpcChunkStreamSource { inner }
    }
}

impl crate::arrow_result::ChunkSource for GrpcChunkStreamSource {
    fn next_chunk(&mut self) -> crate::Result<Option<bytes::Bytes>> {
        Ok(self.inner.next_chunk()?)
    }
}