use std::sync::Arc;
use sqlx::postgres::PgPool;
use crate::config::ShardingConfig;
use crate::entity::Entity;
use crate::entity_client::EntityClient;
use crate::error::ClusterError;
use crate::metrics::ClusterMetrics;
use crate::sharding::Sharding;
use crate::sharding_impl::ShardingImpl;
use crate::storage::noop_runners::NoopRunners;
use crate::storage::sql_message::SqlMessageStorage;
pub struct SingleRunner {
sharding: Arc<ShardingImpl>,
config: Arc<ShardingConfig>,
}
pub struct SingleRunnerBuilder {
pool: PgPool,
config: ShardingConfig,
migrations_table: Option<String>,
}
impl SingleRunnerBuilder {
pub fn new(pool: PgPool) -> Self {
Self {
pool,
config: ShardingConfig::default(),
migrations_table: None,
}
}
pub fn config(mut self, config: ShardingConfig) -> Self {
self.config = config;
self
}
pub fn migrations_table(mut self, migrations_table: impl Into<String>) -> Self {
self.migrations_table = Some(migrations_table.into());
self
}
pub async fn build(self) -> Result<SingleRunner, ClusterError> {
{
let storage = crate::storage::Storage::builder(&self.pool);
if let Some(migrations_table) = self.migrations_table.as_deref() {
storage.migrations_table(migrations_table).migrate().await?;
} else {
storage.migrate().await?;
}
}
let message_storage = Arc::new(
SqlMessageStorage::with_max_retries(self.pool, self.config.storage_message_max_retries)
.with_batch_limit(self.config.storage_inbox_size as u32)
.with_last_read_guard_interval(self.config.last_read_guard_interval),
);
let config = Arc::new(self.config);
let runners = Arc::new(NoopRunners);
let metrics = Arc::new(ClusterMetrics::unregistered());
let sharding = ShardingImpl::new(
Arc::clone(&config),
runners,
None,
None,
Some(message_storage),
metrics,
)?;
sharding.acquire_all_shards().await;
Ok(SingleRunner { sharding, config })
}
}
impl SingleRunner {
pub fn builder(pool: PgPool) -> SingleRunnerBuilder {
SingleRunnerBuilder::new(pool)
}
pub async fn register(
&self,
entity: impl Entity + 'static,
) -> Result<EntityClient, ClusterError> {
let entity = Arc::new(entity);
let entity_type = entity.entity_type();
self.sharding.register_entity(entity).await?;
Ok(Arc::clone(&self.sharding).make_client(entity_type))
}
pub fn sharding(&self) -> &Arc<ShardingImpl> {
&self.sharding
}
pub fn config(&self) -> &Arc<ShardingConfig> {
&self.config
}
pub async fn shutdown(&self) -> Result<(), ClusterError> {
self.sharding.shutdown().await
}
}