1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::{
    api::{error::Result, spring_source_row::SpringSourceRow, SpringConfig, SpringSinkRow},
    connection::Connection,
};

/// Pipeline.
#[derive(Debug)]
pub struct SpringPipeline(Connection);

impl SpringPipeline {
    /// Creates and open an in-process stream pipeline.
    pub fn new(config: &SpringConfig) -> Result<Self> {
        let conn = Connection::new(config);
        Ok(Self(conn))
    }

    /// Execute commands (DDL).
    ///
    /// # Failure
    ///
    /// - [SpringError::Sql](crate::api::error::SpringError::Sql) when:
    ///   - Invalid SQL syntax.
    ///   - Refers to undefined objects (streams, pumps, etc)
    ///   - Other semantic errors.
    /// - [SpringError::InvalidOption](crate::api::error::SpringError::Sql) when:
    ///   - `OPTIONS` in `CREATE` statement includes invalid key or value.
    pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
        self.0.command(sql.as_ref())
    }

    /// Pop a row from an in memory queue. This is a blocking function.
    ///
    /// **Do not call this function from threads.**
    /// If you need to pop from multiple in-memory queues using threads, use `pop_non_blocking()`.
    /// See: <https://github.com/SpringQL/SpringQL/issues/125>
    ///
    /// # Failure
    ///
    /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
    ///   - queue named `queue` does not exist.
    pub fn pop(&self, queue: &str) -> Result<SpringSinkRow> {
        self.0.pop(queue).map(SpringSinkRow::new)
    }

    /// Pop a row from an in memory queue. This is a non-blocking function.
    ///
    /// # Returns
    ///
    /// - `Ok(Some)` when at least a row is in the queue.
    /// - `None` when no row is in the queue.
    ///
    /// # Failure
    ///
    /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
    ///   - queue named `queue` does not exist.
    pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringSinkRow>> {
        self.0
            .pop_non_blocking(queue)
            .map(|opt_row| opt_row.map(SpringSinkRow::new))
    }

    /// Push a row into an in memory queue. This is a non-blocking function.
    ///
    /// # Failure
    ///
    /// - [SpringError::Unavailable](crate::api::error::SpringError::Unavailable) when:
    ///   - queue named `queue` does not exist.
    pub fn push(&self, queue: &str, row: SpringSourceRow) -> Result<()> {
        self.0.push(queue, row.into_schemaless_row()?)
    }
}