1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
use crate::{
api::{error::Result, spring_source_row::SpringSourceRow, SpringConfig, SpringSinkRow},
connection::Connection,
};
/// Pipeline.
#[derive(Debug)]
pub struct SpringPipeline(Connection);
impl SpringPipeline {
/// Creates and open an in-process stream pipeline.
pub fn new(config: &SpringConfig) -> Result<Self> {
let conn = Connection::new(config);
Ok(Self(conn))
}
/// Execute commands (DDL).
///
/// # Failure
///
/// - [SpringError::Sql](crate::api::error::SpringError::Sql) when:
/// - Invalid SQL syntax.
/// - Refers to undefined objects (streams, pumps, etc)
/// - Other semantic errors.
/// - [SpringError::InvalidOption](crate::api::error::SpringError::Sql) when:
/// - `OPTIONS` in `CREATE` statement includes invalid key or value.
pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
self.0.command(sql.as_ref())
}
/// Pop a row from an in memory queue. This is a blocking function.
///
/// **Do not call this function from threads.**
/// If you need to pop from multiple in-memory queues using threads, use `pop_non_blocking()`.
/// See: <https://github.com/SpringQL/SpringQL/issues/125>
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop(&self, queue: &str) -> Result<SpringSinkRow> {
self.0.pop(queue).map(SpringSinkRow::new)
}
/// Pop a row from an in memory queue. This is a non-blocking function.
///
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringSinkRow>> {
self.0
.pop_non_blocking(queue)
.map(|opt_row| opt_row.map(SpringSinkRow::new))
}
/// Push a row into an in memory queue. This is a non-blocking function.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn push(&self, queue: &str, row: SpringSourceRow) -> Result<()> {
self.0.push(queue, row.into_schemaless_row()?)
}
}