use anyhow::anyhow;
use crate::{
api::error::{foreign_info::ForeignInfo, Result, SpringError},
api::SpringSourceReaderConfig,
pipeline::{InMemoryQueueOptions, Options, QueueName},
stream_engine::{
autonomous_executor::{row::SourceRow, task::source_task::source_reader::SourceReader},
in_memory_queue_repository::InMemoryQueueRepository,
},
};
#[derive(Debug)]
pub struct InMemoryQueueSourceReader(QueueName);
impl SourceReader for InMemoryQueueSourceReader {
fn start(options: &Options, _config: &SpringSourceReaderConfig) -> Result<Self> {
let options = InMemoryQueueOptions::try_from(options)?;
let queue_name = options.queue_name;
InMemoryQueueRepository::instance().create(queue_name.clone())?;
Ok(Self(queue_name))
}
fn next_row(&mut self) -> Result<SourceRow> {
let q = InMemoryQueueRepository::instance().get(&self.0)?;
if let Some(row) = q.pop_non_blocking() {
Ok(SourceRow::Raw(row))
} else {
Err(SpringError::ForeignSourceTimeout {
source: anyhow!("queue is empty"),
foreign_info: ForeignInfo::InMemoryQueue(self.0.clone()),
})
}
}
}