use anyhow::Result;
use async_trait::async_trait;
use thiserror::Error;
use crate::channels::*;
use crate::context::SourceRuntimeContext;
#[derive(Error, Debug)]
pub enum SourceError {
#[error("Source '{source_id}' cannot resume from position {requested}: position unavailable (earliest available: {earliest_available:?})")]
PositionUnavailable {
source_id: String,
requested: u64,
earliest_available: Option<u64>,
},
}
#[async_trait]
pub trait Source: Send + Sync {
fn id(&self) -> &str;
fn type_name(&self) -> &str;
fn properties(&self) -> std::collections::HashMap<String, serde_json::Value>;
fn dispatch_mode(&self) -> DispatchMode {
DispatchMode::Channel
}
fn auto_start(&self) -> bool {
true
}
fn supports_replay(&self) -> bool {
false
}
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn status(&self) -> ComponentStatus;
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> Result<SubscriptionResponse>;
fn as_any(&self) -> &dyn std::any::Any;
async fn deprovision(&self) -> Result<()> {
Ok(())
}
async fn initialize(&self, context: SourceRuntimeContext);
async fn set_bootstrap_provider(
&self,
_provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
) {
}
}
#[async_trait]
impl Source for Box<dyn Source + 'static> {
fn id(&self) -> &str {
(**self).id()
}
fn type_name(&self) -> &str {
(**self).type_name()
}
fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
(**self).properties()
}
fn dispatch_mode(&self) -> DispatchMode {
(**self).dispatch_mode()
}
fn auto_start(&self) -> bool {
(**self).auto_start()
}
fn supports_replay(&self) -> bool {
(**self).supports_replay()
}
async fn start(&self) -> Result<()> {
(**self).start().await
}
async fn stop(&self) -> Result<()> {
(**self).stop().await
}
async fn status(&self) -> ComponentStatus {
(**self).status().await
}
async fn subscribe(
&self,
settings: crate::config::SourceSubscriptionSettings,
) -> Result<SubscriptionResponse> {
(**self).subscribe(settings).await
}
fn as_any(&self) -> &dyn std::any::Any {
(**self).as_any()
}
async fn deprovision(&self) -> Result<()> {
(**self).deprovision().await
}
async fn initialize(&self, context: SourceRuntimeContext) {
(**self).initialize(context).await
}
async fn set_bootstrap_provider(
&self,
provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
) {
(**self).set_bootstrap_provider(provider).await
}
}