use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use pollen_claimer::{Claimer, ClaimerConfig};
use pollen_clock::SharedClock;
use pollen_executor::{Executor, TokioExecutor};
use pollen_membership::{ChitchatMembership, Membership};
use pollen_router::{ConsistentHashRouter, TaskRouter};
use pollen_scheduler::{DefaultScheduler, SharedScheduler};
use pollen_store::{MemoryStore, SqliteStore, StorageConfig, StoreBackend};
use pollen_types::{NodeId, Result, Schedule, TaskDef};
use tokio_util::sync::CancellationToken;
use tracing::info;
#[derive(Debug, Clone)]
pub enum Storage {
Sqlite(PathBuf),
Memory,
}
impl Storage {
pub fn sqlite<P: Into<PathBuf>>(path: P) -> Self {
Storage::Sqlite(path.into())
}
pub fn memory() -> Self {
Storage::Memory
}
}
#[derive(Debug, Clone)]
pub struct ClusterConfig {
pub bind_addr: SocketAddr,
pub seeds: Vec<SocketAddr>,
pub cluster_name: String,
pub metadata: Vec<(String, String)>,
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
bind_addr: "0.0.0.0:7000".parse().unwrap(),
seeds: Vec::new(),
cluster_name: "pollen".to_string(),
metadata: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct ExecutorSettings {
pub max_concurrent: usize,
pub default_timeout: Duration,
}
impl Default for ExecutorSettings {
fn default() -> Self {
Self {
max_concurrent: 100,
default_timeout: Duration::from_secs(300),
}
}
}
pub struct PollenBuilder {
storage: Storage,
cluster: Option<ClusterConfig>,
executor: ExecutorSettings,
}
impl PollenBuilder {
pub fn new() -> Self {
Self {
storage: Storage::Memory,
cluster: None,
executor: ExecutorSettings::default(),
}
}
pub fn storage(mut self, storage: Storage) -> Self {
self.storage = storage;
self
}
pub fn cluster(mut self, config: ClusterConfig) -> Self {
self.cluster = Some(config);
self
}
pub fn executor(mut self, config: ExecutorSettings) -> Self {
self.executor = config;
self
}
pub async fn build(self) -> Result<Pollen> {
let node_id = NodeId::new();
info!("Starting Pollen with node ID: {:?}", node_id);
let clock = pollen_clock::new_clock_with_id(node_id);
let store: Arc<StoreBackend> = match &self.storage {
Storage::Sqlite(path) => {
let config = StorageConfig {
path: path.to_string_lossy().to_string(),
..Default::default()
};
let sqlite = SqliteStore::open(&config)?;
Arc::new(StoreBackend::Sqlite(sqlite))
}
Storage::Memory => Arc::new(StoreBackend::Memory(MemoryStore::new())),
};
let executor_config = pollen_executor::ExecutorConfig {
max_concurrent: self.executor.max_concurrent,
default_timeout: self.executor.default_timeout,
};
let executor: Arc<dyn Executor> = Arc::new(TokioExecutor::new(executor_config));
#[allow(clippy::type_complexity)]
let (membership, router): (
Option<Arc<dyn Membership>>,
Option<Arc<dyn TaskRouter>>,
) = if let Some(ref cluster) = self.cluster {
let types_config = pollen_types::ClusterConfig {
bind_addr: cluster.bind_addr,
seeds: cluster.seeds.clone(),
cluster_name: cluster.cluster_name.clone(),
metadata: cluster.metadata.iter().cloned().collect(),
..Default::default()
};
let membership: Arc<dyn Membership> =
Arc::new(ChitchatMembership::new(types_config, node_id).await?);
let router: Arc<dyn TaskRouter> =
Arc::new(ConsistentHashRouter::new(node_id, Arc::clone(&membership)));
info!(
"Pollen cluster mode initialized at {}",
cluster.bind_addr
);
(Some(membership), Some(router))
} else {
(None, None)
};
let scheduler = Arc::new(DefaultScheduler::new(
clock.clone(),
Arc::clone(&store),
None, ));
scheduler.clone().start();
let scheduler: SharedScheduler = scheduler;
let claimer_config = ClaimerConfig::default();
let claimer = Arc::new(Claimer::new(
node_id,
Arc::clone(&store),
router.clone(),
Arc::clone(&executor),
Arc::clone(&scheduler),
membership.clone(),
claimer_config,
));
let shutdown = CancellationToken::new();
let claimer_handle = claimer.clone();
let shutdown_clone = shutdown.clone();
tokio::spawn(async move {
claimer_handle.run(shutdown_clone).await;
});
let mode = if self.cluster.is_some() {
"cluster"
} else {
"single-node"
};
info!("Pollen scheduler initialized in {} mode", mode);
Ok(Pollen {
node_id,
_clock: clock,
_store: store,
_executor: executor,
scheduler,
claimer,
membership,
shutdown,
_storage_config: self.storage,
_cluster_config: self.cluster,
})
}
}
impl Default for PollenBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct Pollen {
node_id: NodeId,
_clock: SharedClock,
_store: Arc<StoreBackend>,
_executor: Arc<dyn Executor>,
scheduler: SharedScheduler,
#[allow(dead_code)] claimer: Arc<Claimer>,
membership: Option<Arc<dyn Membership>>,
shutdown: CancellationToken,
_storage_config: Storage,
_cluster_config: Option<ClusterConfig>,
}
impl Pollen {
pub fn builder() -> PollenBuilder {
PollenBuilder::new()
}
pub fn node_id(&self) -> NodeId {
self.node_id
}
pub fn is_cluster_mode(&self) -> bool {
self._cluster_config.is_some()
}
pub fn define(&self, name: &str) -> TaskBuilder {
TaskBuilder::new(name.to_string(), Arc::clone(&self.scheduler))
}
pub fn get_task(&self, name: &str) -> Option<TaskDef> {
self.scheduler.get_task_by_name(name)
}
pub async fn trigger(&self, name: &str) -> Result<()> {
let task = self
.scheduler
.get_task_by_name(name)
.ok_or_else(|| pollen_types::PollenError::TaskNameNotFound(name.to_string()))?;
self.scheduler.trigger(&task.id, None).await?;
Ok(())
}
pub async fn trigger_with_payload(&self, name: &str, payload: bytes::Bytes) -> Result<()> {
let task = self
.scheduler
.get_task_by_name(name)
.ok_or_else(|| pollen_types::PollenError::TaskNameNotFound(name.to_string()))?;
self.scheduler.trigger(&task.id, Some(payload)).await?;
Ok(())
}
pub fn task_info(&self, name: &str) -> Option<TaskInfo> {
self.scheduler.get_task_by_name(name).map(|task| TaskInfo {
id: task.id.clone(),
name: task.name.clone(),
enabled: task.enabled,
last_run: None, next_run: None, })
}
pub fn list_tasks(&self) -> Vec<TaskDef> {
self.scheduler.list_tasks()
}
pub fn cluster_state(&self) -> Option<ClusterState> {
if let Some(ref membership) = self.membership {
let members = membership.alive_members();
Some(ClusterState {
local_node: self.node_id,
nodes: members
.into_iter()
.map(|m| NodeInfo {
id: m.id,
addr: m.addr,
state: match m.state {
pollen_types::MemberState::Alive => NodeState::Alive,
pollen_types::MemberState::Suspect => NodeState::Suspect,
pollen_types::MemberState::Dead | pollen_types::MemberState::Left => {
NodeState::Dead
}
},
})
.collect(),
})
} else {
None
}
}
pub fn cluster_size(&self) -> usize {
self.membership
.as_ref()
.map(|m| m.alive_members().len())
.unwrap_or(1)
}
pub async fn shutdown(self) {
info!("Shutting down Pollen scheduler...");
self.shutdown.cancel();
if let Some(membership) = &self.membership {
membership.shutdown().await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Pollen scheduler shutdown complete");
}
}
pub struct TaskBuilder {
name: String,
scheduler: SharedScheduler,
schedule: Option<Schedule>,
handler: Option<Arc<dyn pollen_executor::TaskHandler>>,
retry_policy: Option<pollen_types::RetryPolicy>,
timeout: Option<Duration>,
}
impl TaskBuilder {
fn new(name: String, scheduler: SharedScheduler) -> Self {
Self {
name,
scheduler,
schedule: None,
handler: None,
retry_policy: None,
timeout: None,
}
}
pub fn schedule(mut self, schedule: Schedule) -> Self {
self.schedule = Some(schedule);
self
}
pub fn handler<F, Fut>(mut self, handler: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
{
self.handler = Some(pollen_executor::simple_handler(handler));
self
}
pub fn retry(mut self, policy: pollen_types::RetryPolicy) -> Self {
self.retry_policy = Some(policy);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn register(self) -> Result<()> {
let schedule = self.schedule.ok_or_else(|| {
pollen_types::PollenError::Config("Task schedule is required".to_string())
})?;
let handler = self.handler.ok_or_else(|| {
pollen_types::PollenError::Config("Task handler is required".to_string())
})?;
let mut task = TaskDef::new(&self.name, schedule);
if let Some(retry) = self.retry_policy {
task = task.with_retry(retry);
}
if let Some(timeout) = self.timeout {
task = task.with_timeout(timeout);
}
self.scheduler.register(task, handler).await
}
}
#[derive(Debug, Clone)]
pub struct TaskInfo {
pub id: pollen_types::TaskId,
pub name: String,
pub enabled: bool,
pub last_run: Option<chrono::DateTime<chrono::Utc>>,
pub next_run: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone)]
pub struct ClusterState {
pub local_node: pollen_types::NodeId,
pub nodes: Vec<NodeInfo>,
}
#[derive(Debug, Clone)]
pub struct NodeInfo {
pub id: pollen_types::NodeId,
pub addr: SocketAddr,
pub state: NodeState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeState {
Alive,
Suspect,
Dead,
}