springql_core/api/spring_pipeline.rs
1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3use crate::{
4 api::{error::Result, spring_source_row::SpringSourceRow, SpringConfig, SpringSinkRow},
5 connection::Connection,
6};
7
8/// Pipeline.
9#[derive(Debug)]
10pub struct SpringPipeline(Connection);
11
12impl SpringPipeline {
13 /// Creates and open an in-process stream pipeline.
14 pub fn new(config: &SpringConfig) -> Result<Self> {
15 let conn = Connection::new(config);
16 Ok(Self(conn))
17 }
18
19 /// Execute commands (DDL).
20 ///
21 /// # Failure
22 ///
23 /// - [SpringError::Sql](crate::api::error::SpringError::Sql) when:
24 /// - Invalid SQL syntax.
25 /// - Refers to undefined objects (streams, pumps, etc)
26 /// - Other semantic errors.
27 /// - [SpringError::InvalidOption](crate::api::error::SpringError::Sql) when:
28 /// - `OPTIONS` in `CREATE` statement includes invalid key or value.
29 pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
30 self.0.command(sql.as_ref())
31 }
32
33 /// Pop a row from an in memory queue. This is a blocking function.
34 ///
35 /// **Do not call this function from threads.**
36 /// If you need to pop from multiple in-memory queues using threads, use `pop_non_blocking()`.
37 /// See: <https://github.com/SpringQL/SpringQL/issues/125>
38 ///
39 /// # Failure
40 ///
41 /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
42 /// - queue named `queue` does not exist.
43 pub fn pop(&self, queue: &str) -> Result<SpringSinkRow> {
44 self.0.pop(queue).map(SpringSinkRow::new)
45 }
46
47 /// Pop a row from an in memory queue. This is a non-blocking function.
48 ///
49 /// # Returns
50 ///
51 /// - `Ok(Some)` when at least a row is in the queue.
52 /// - `None` when no row is in the queue.
53 ///
54 /// # Failure
55 ///
56 /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
57 /// - queue named `queue` does not exist.
58 pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringSinkRow>> {
59 self.0
60 .pop_non_blocking(queue)
61 .map(|opt_row| opt_row.map(SpringSinkRow::new))
62 }
63
64 /// Push a row into an in memory queue. This is a non-blocking function.
65 ///
66 /// # Failure
67 ///
68 /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
69 /// - queue named `queue` does not exist.
70 pub fn push(&self, queue: &str, row: SpringSourceRow) -> Result<()> {
71 self.0.push(queue, row.into_schemaless_row()?)
72 }
73}