use std::sync::Arc;
use async_trait::async_trait;
use crate::queries::output_state::{FetchError, OutboxStream, SnapshotStream};
use crate::queries::Query;
use crate::reactions::checkpoint::ReactionCheckpoint;
use crate::state_store::StateStoreProvider;
#[async_trait]
#[doc(hidden)]
pub trait BootstrapBackend: Send + Sync {
async fn fetch_snapshot(&self) -> Result<SnapshotStream, FetchError>;
async fn fetch_outbox(&self, after_sequence: u64) -> Result<OutboxStream, FetchError>;
async fn read_checkpoint(&self) -> anyhow::Result<Option<ReactionCheckpoint>>;
async fn write_checkpoint(&self, checkpoint: &ReactionCheckpoint) -> anyhow::Result<()>;
}
struct InProcessBackend {
query: Arc<dyn Query>,
reaction_id: String,
query_id: String,
state_store: Option<Arc<dyn StateStoreProvider>>,
}
#[async_trait]
impl BootstrapBackend for InProcessBackend {
async fn fetch_snapshot(&self) -> Result<SnapshotStream, FetchError> {
let snapshot = self.query.fetch_snapshot().await?;
Ok(SnapshotStream::from_snapshot(snapshot))
}
async fn fetch_outbox(&self, after_sequence: u64) -> Result<OutboxStream, FetchError> {
let outbox = self.query.fetch_outbox(after_sequence).await?;
Ok(OutboxStream::from_outbox(outbox))
}
async fn read_checkpoint(&self) -> anyhow::Result<Option<ReactionCheckpoint>> {
match self.state_store.as_ref() {
Some(store) => {
crate::reactions::checkpoint::read_checkpoint(
store.as_ref(),
&self.reaction_id,
&self.query_id,
)
.await
}
None => Ok(None),
}
}
async fn write_checkpoint(&self, checkpoint: &ReactionCheckpoint) -> anyhow::Result<()> {
let store = self.state_store.as_ref().ok_or_else(|| {
anyhow::anyhow!("No state store configured — cannot write checkpoint")
})?;
crate::reactions::checkpoint::write_checkpoint(
store.as_ref(),
&self.reaction_id,
&self.query_id,
checkpoint,
)
.await
}
}
pub struct BootstrapContext {
pub query_id: String,
pub is_reset: bool,
backend: Box<dyn BootstrapBackend>,
}
impl BootstrapContext {
pub fn new(
query_id: String,
is_reset: bool,
query: Arc<dyn Query>,
reaction_id: String,
state_store: Option<Arc<dyn StateStoreProvider>>,
) -> Self {
let backend = InProcessBackend {
query,
reaction_id,
query_id: query_id.clone(),
state_store,
};
Self {
query_id,
is_reset,
backend: Box::new(backend),
}
}
#[doc(hidden)]
pub fn from_backend(
query_id: String,
is_reset: bool,
backend: Box<dyn BootstrapBackend>,
) -> Self {
Self {
query_id,
is_reset,
backend,
}
}
pub async fn fetch_snapshot(&self) -> Result<SnapshotStream, FetchError> {
self.backend.fetch_snapshot().await
}
pub async fn fetch_outbox(&self, after_sequence: u64) -> Result<OutboxStream, FetchError> {
self.backend.fetch_outbox(after_sequence).await
}
pub async fn read_checkpoint(&self) -> anyhow::Result<Option<ReactionCheckpoint>> {
self.backend.read_checkpoint().await
}
pub async fn write_checkpoint(&self, checkpoint: &ReactionCheckpoint) -> anyhow::Result<()> {
self.backend.write_checkpoint(checkpoint).await
}
}