use crate::{
api::{error::Result, spring_source_row::SpringSourceRow, SpringConfig, SpringSinkRow},
connection::Connection,
};
#[derive(Debug)]
pub struct SpringPipeline(Connection);
impl SpringPipeline {
pub fn new(config: &SpringConfig) -> Result<Self> {
let conn = Connection::new(config);
Ok(Self(conn))
}
pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
self.0.command(sql.as_ref())
}
pub fn pop(&self, queue: &str) -> Result<SpringSinkRow> {
self.0.pop(queue).map(SpringSinkRow::new)
}
pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringSinkRow>> {
self.0
.pop_non_blocking(queue)
.map(|opt_row| opt_row.map(SpringSinkRow::new))
}
pub fn push(&self, queue: &str, row: SpringSourceRow) -> Result<()> {
self.0.push(queue, row.into_schemaless_row()?)
}
}