springql_core_release_test/api/
spring_pipeline.rs

1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3use crate::{
4    api::{error::Result, spring_source_row::SpringSourceRow, SpringConfig, SpringSinkRow},
5    connection::Connection,
6};
7
8/// Pipeline.
9#[derive(Debug)]
10pub struct SpringPipeline(Connection);
11
12impl SpringPipeline {
13    /// Creates and open an in-process stream pipeline.
14    pub fn new(config: &SpringConfig) -> Result<Self> {
15        let conn = Connection::new(config);
16        Ok(Self(conn))
17    }
18
19    /// Execute commands (DDL).
20    ///
21    /// # Failure
22    ///
23    /// - [SpringError::Sql](crate::api::error::SpringError::Sql) when:
24    ///   - Invalid SQL syntax.
25    ///   - Refers to undefined objects (streams, pumps, etc)
26    ///   - Other semantic errors.
27    /// - [SpringError::InvalidOption](crate::api::error::SpringError::Sql) when:
28    ///   - `OPTIONS` in `CREATE` statement includes invalid key or value.
29    pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
30        self.0.command(sql.as_ref())
31    }
32
33    /// Pop a row from an in memory queue. This is a blocking function.
34    ///
35    /// **Do not call this function from threads.**
36    /// If you need to pop from multiple in-memory queues using threads, use `pop_non_blocking()`.
37    /// See: <https://github.com/SpringQL/SpringQL/issues/125>
38    ///
39    /// # Failure
40    ///
41    /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
42    ///   - queue named `queue` does not exist.
43    pub fn pop(&self, queue: &str) -> Result<SpringSinkRow> {
44        self.0.pop(queue).map(SpringSinkRow::new)
45    }
46
47    /// Pop a row from an in memory queue. This is a non-blocking function.
48    ///
49    /// # Returns
50    ///
51    /// - `Ok(Some)` when at least a row is in the queue.
52    /// - `None` when no row is in the queue.
53    ///
54    /// # Failure
55    ///
56    /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
57    ///   - queue named `queue` does not exist.
58    pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringSinkRow>> {
59        self.0
60            .pop_non_blocking(queue)
61            .map(|opt_row| opt_row.map(SpringSinkRow::new))
62    }
63
64    /// Push a row into an in memory queue. This is a non-blocking function.
65    ///
66    /// # Failure
67    ///
68    /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
69    ///   - queue named `queue` does not exist.
70    pub fn push(&self, queue: &str, row: SpringSourceRow) -> Result<()> {
71        self.0.push(queue, row.into_schemaless_row()?)
72    }
73}