1use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use pollen_claimer::{Claimer, ClaimerConfig};
9use pollen_clock::SharedClock;
10use pollen_executor::{Executor, TokioExecutor};
11use pollen_membership::{ChitchatMembership, Membership};
12use pollen_router::{ConsistentHashRouter, TaskRouter};
13use pollen_scheduler::{DefaultScheduler, SharedScheduler};
14use pollen_store::{MemoryStore, SqliteStore, StorageConfig, StoreBackend};
15use pollen_types::{NodeId, Result, Schedule, TaskDef};
16use tokio_util::sync::CancellationToken;
17use tracing::info;
18
19#[derive(Debug, Clone)]
21pub enum Storage {
22 Sqlite(PathBuf),
24 Memory,
26}
27
28impl Storage {
29 pub fn sqlite<P: Into<PathBuf>>(path: P) -> Self {
31 Storage::Sqlite(path.into())
32 }
33
34 pub fn memory() -> Self {
36 Storage::Memory
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ClusterConfig {
43 pub bind_addr: SocketAddr,
45 pub seeds: Vec<SocketAddr>,
47 pub cluster_name: String,
49 pub metadata: Vec<(String, String)>,
51}
52
53impl Default for ClusterConfig {
54 fn default() -> Self {
55 Self {
56 bind_addr: "0.0.0.0:7000".parse().unwrap(),
57 seeds: Vec::new(),
58 cluster_name: "pollen".to_string(),
59 metadata: Vec::new(),
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct ExecutorSettings {
67 pub max_concurrent: usize,
69 pub default_timeout: Duration,
71}
72
73impl Default for ExecutorSettings {
74 fn default() -> Self {
75 Self {
76 max_concurrent: 100,
77 default_timeout: Duration::from_secs(300),
78 }
79 }
80}
81
82pub struct PollenBuilder {
84 storage: Storage,
85 cluster: Option<ClusterConfig>,
86 executor: ExecutorSettings,
87}
88
89impl PollenBuilder {
90 pub fn new() -> Self {
92 Self {
93 storage: Storage::Memory,
94 cluster: None,
95 executor: ExecutorSettings::default(),
96 }
97 }
98
99 pub fn storage(mut self, storage: Storage) -> Self {
101 self.storage = storage;
102 self
103 }
104
105 pub fn cluster(mut self, config: ClusterConfig) -> Self {
107 self.cluster = Some(config);
108 self
109 }
110
111 pub fn executor(mut self, config: ExecutorSettings) -> Self {
113 self.executor = config;
114 self
115 }
116
117 pub async fn build(self) -> Result<Pollen> {
119 let node_id = NodeId::new();
121 info!("Starting Pollen with node ID: {:?}", node_id);
122
123 let clock = pollen_clock::new_clock_with_id(node_id);
127
128 let store: Arc<StoreBackend> = match &self.storage {
130 Storage::Sqlite(path) => {
131 let config = StorageConfig {
132 path: path.to_string_lossy().to_string(),
133 ..Default::default()
134 };
135 let sqlite = SqliteStore::open(&config)?;
136 Arc::new(StoreBackend::Sqlite(sqlite))
137 }
138 Storage::Memory => Arc::new(StoreBackend::Memory(MemoryStore::new())),
139 };
140
141 let executor_config = pollen_executor::ExecutorConfig {
143 max_concurrent: self.executor.max_concurrent,
144 default_timeout: self.executor.default_timeout,
145 };
146 let executor: Arc<dyn Executor> = Arc::new(TokioExecutor::new(executor_config));
147
148 #[allow(clippy::type_complexity)]
150 let (membership, router): (
151 Option<Arc<dyn Membership>>,
152 Option<Arc<dyn TaskRouter>>,
153 ) = if let Some(ref cluster) = self.cluster {
154 let types_config = pollen_types::ClusterConfig {
156 bind_addr: cluster.bind_addr,
157 seeds: cluster.seeds.clone(),
158 cluster_name: cluster.cluster_name.clone(),
159 metadata: cluster.metadata.iter().cloned().collect(),
160 ..Default::default()
161 };
162
163 let membership: Arc<dyn Membership> =
165 Arc::new(ChitchatMembership::new(types_config, node_id).await?);
166
167 let router: Arc<dyn TaskRouter> =
169 Arc::new(ConsistentHashRouter::new(node_id, Arc::clone(&membership)));
170
171 info!(
172 "Pollen cluster mode initialized at {}",
173 cluster.bind_addr
174 );
175
176 (Some(membership), Some(router))
177 } else {
178 (None, None)
179 };
180
181 let scheduler = Arc::new(DefaultScheduler::new(
185 clock.clone(),
186 Arc::clone(&store),
187 None, ));
189
190 scheduler.clone().start();
192
193 let scheduler: SharedScheduler = scheduler;
194
195 let claimer_config = ClaimerConfig::default();
197 let claimer = Arc::new(Claimer::new(
198 node_id,
199 Arc::clone(&store),
200 router.clone(),
201 Arc::clone(&executor),
202 Arc::clone(&scheduler),
203 membership.clone(),
204 claimer_config,
205 ));
206
207 let shutdown = CancellationToken::new();
209
210 let claimer_handle = claimer.clone();
212 let shutdown_clone = shutdown.clone();
213 tokio::spawn(async move {
214 claimer_handle.run(shutdown_clone).await;
215 });
216
217 let mode = if self.cluster.is_some() {
218 "cluster"
219 } else {
220 "single-node"
221 };
222 info!("Pollen scheduler initialized in {} mode", mode);
223
224 Ok(Pollen {
225 node_id,
226 _clock: clock,
227 _store: store,
228 _executor: executor,
229 scheduler,
230 claimer,
231 membership,
232 shutdown,
233 _storage_config: self.storage,
234 _cluster_config: self.cluster,
235 })
236 }
237}
238
239impl Default for PollenBuilder {
240 fn default() -> Self {
241 Self::new()
242 }
243}
244
245pub struct Pollen {
247 node_id: NodeId,
248 _clock: SharedClock,
249 _store: Arc<StoreBackend>,
250 _executor: Arc<dyn Executor>,
251 scheduler: SharedScheduler,
252 #[allow(dead_code)] claimer: Arc<Claimer>,
254 membership: Option<Arc<dyn Membership>>,
255 shutdown: CancellationToken,
256 _storage_config: Storage,
257 _cluster_config: Option<ClusterConfig>,
258}
259
260impl Pollen {
261 pub fn builder() -> PollenBuilder {
263 PollenBuilder::new()
264 }
265
266 pub fn node_id(&self) -> NodeId {
268 self.node_id
269 }
270
271 pub fn is_cluster_mode(&self) -> bool {
273 self._cluster_config.is_some()
274 }
275
276 pub fn define(&self, name: &str) -> TaskBuilder {
278 TaskBuilder::new(name.to_string(), Arc::clone(&self.scheduler))
279 }
280
281 pub fn get_task(&self, name: &str) -> Option<TaskDef> {
283 self.scheduler.get_task_by_name(name)
284 }
285
286 pub async fn trigger(&self, name: &str) -> Result<()> {
288 let task = self
289 .scheduler
290 .get_task_by_name(name)
291 .ok_or_else(|| pollen_types::PollenError::TaskNameNotFound(name.to_string()))?;
292 self.scheduler.trigger(&task.id, None).await?;
293 Ok(())
294 }
295
296 pub async fn trigger_with_payload(&self, name: &str, payload: bytes::Bytes) -> Result<()> {
298 let task = self
299 .scheduler
300 .get_task_by_name(name)
301 .ok_or_else(|| pollen_types::PollenError::TaskNameNotFound(name.to_string()))?;
302 self.scheduler.trigger(&task.id, Some(payload)).await?;
303 Ok(())
304 }
305
306 pub fn task_info(&self, name: &str) -> Option<TaskInfo> {
308 self.scheduler.get_task_by_name(name).map(|task| TaskInfo {
309 id: task.id.clone(),
310 name: task.name.clone(),
311 enabled: task.enabled,
312 last_run: None, next_run: None, })
315 }
316
317 pub fn list_tasks(&self) -> Vec<TaskDef> {
319 self.scheduler.list_tasks()
320 }
321
322 pub fn cluster_state(&self) -> Option<ClusterState> {
324 if let Some(ref membership) = self.membership {
325 let members = membership.alive_members();
326 Some(ClusterState {
327 local_node: self.node_id,
328 nodes: members
329 .into_iter()
330 .map(|m| NodeInfo {
331 id: m.id,
332 addr: m.addr,
333 state: match m.state {
334 pollen_types::MemberState::Alive => NodeState::Alive,
335 pollen_types::MemberState::Suspect => NodeState::Suspect,
336 pollen_types::MemberState::Dead | pollen_types::MemberState::Left => {
337 NodeState::Dead
338 }
339 },
340 })
341 .collect(),
342 })
343 } else {
344 None
345 }
346 }
347
348 pub fn cluster_size(&self) -> usize {
350 self.membership
351 .as_ref()
352 .map(|m| m.alive_members().len())
353 .unwrap_or(1)
354 }
355
356 pub async fn shutdown(self) {
358 info!("Shutting down Pollen scheduler...");
359 self.shutdown.cancel();
360
361 if let Some(membership) = &self.membership {
363 membership.shutdown().await;
364 }
365
366 tokio::time::sleep(Duration::from_millis(100)).await;
368 info!("Pollen scheduler shutdown complete");
369 }
370}
371
372pub struct TaskBuilder {
374 name: String,
375 scheduler: SharedScheduler,
376 schedule: Option<Schedule>,
377 handler: Option<Arc<dyn pollen_executor::TaskHandler>>,
378 retry_policy: Option<pollen_types::RetryPolicy>,
379 timeout: Option<Duration>,
380}
381
382impl TaskBuilder {
383 fn new(name: String, scheduler: SharedScheduler) -> Self {
384 Self {
385 name,
386 scheduler,
387 schedule: None,
388 handler: None,
389 retry_policy: None,
390 timeout: None,
391 }
392 }
393
394 pub fn schedule(mut self, schedule: Schedule) -> Self {
396 self.schedule = Some(schedule);
397 self
398 }
399
400 pub fn handler<F, Fut>(mut self, handler: F) -> Self
402 where
403 F: Fn() -> Fut + Send + Sync + 'static,
404 Fut: std::future::Future<Output = Result<()>> + Send + 'static,
405 {
406 self.handler = Some(pollen_executor::simple_handler(handler));
407 self
408 }
409
410 pub fn retry(mut self, policy: pollen_types::RetryPolicy) -> Self {
412 self.retry_policy = Some(policy);
413 self
414 }
415
416 pub fn timeout(mut self, timeout: Duration) -> Self {
418 self.timeout = Some(timeout);
419 self
420 }
421
422 pub async fn register(self) -> Result<()> {
424 let schedule = self.schedule.ok_or_else(|| {
425 pollen_types::PollenError::Config("Task schedule is required".to_string())
426 })?;
427
428 let handler = self.handler.ok_or_else(|| {
429 pollen_types::PollenError::Config("Task handler is required".to_string())
430 })?;
431
432 let mut task = TaskDef::new(&self.name, schedule);
433
434 if let Some(retry) = self.retry_policy {
435 task = task.with_retry(retry);
436 }
437
438 if let Some(timeout) = self.timeout {
439 task = task.with_timeout(timeout);
440 }
441
442 self.scheduler.register(task, handler).await
443 }
444}
445
446#[derive(Debug, Clone)]
448pub struct TaskInfo {
449 pub id: pollen_types::TaskId,
450 pub name: String,
451 pub enabled: bool,
452 pub last_run: Option<chrono::DateTime<chrono::Utc>>,
453 pub next_run: Option<chrono::DateTime<chrono::Utc>>,
454}
455
456#[derive(Debug, Clone)]
458pub struct ClusterState {
459 pub local_node: pollen_types::NodeId,
460 pub nodes: Vec<NodeInfo>,
461}
462
463#[derive(Debug, Clone)]
465pub struct NodeInfo {
466 pub id: pollen_types::NodeId,
467 pub addr: SocketAddr,
468 pub state: NodeState,
469}
470
471#[derive(Debug, Clone, Copy, PartialEq, Eq)]
473pub enum NodeState {
474 Alive,
475 Suspect,
476 Dead,
477}