Skip to main content

pollen_rs/
builder.rs

1//! Builder API for Pollen scheduler.
2
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::Duration;
7
8use pollen_claimer::{Claimer, ClaimerConfig};
9use pollen_clock::SharedClock;
10use pollen_executor::{Executor, TokioExecutor};
11use pollen_membership::{ChitchatMembership, Membership};
12use pollen_router::{ConsistentHashRouter, TaskRouter};
13use pollen_scheduler::{DefaultScheduler, SharedScheduler};
14use pollen_store::{MemoryStore, SqliteStore, StorageConfig, StoreBackend};
15use pollen_types::{NodeId, Result, Schedule, TaskDef};
16use tokio_util::sync::CancellationToken;
17use tracing::info;
18
19/// Storage configuration
20#[derive(Debug, Clone)]
21pub enum Storage {
22    /// SQLite file-based storage
23    Sqlite(PathBuf),
24    /// In-memory storage (for testing)
25    Memory,
26}
27
28impl Storage {
29    /// Create SQLite storage configuration
30    pub fn sqlite<P: Into<PathBuf>>(path: P) -> Self {
31        Storage::Sqlite(path.into())
32    }
33
34    /// Create in-memory storage configuration
35    pub fn memory() -> Self {
36        Storage::Memory
37    }
38}
39
40/// Cluster configuration for the builder
41#[derive(Debug, Clone)]
42pub struct ClusterConfig {
43    /// Local bind address for cluster communication
44    pub bind_addr: SocketAddr,
45    /// Seed nodes to join the cluster
46    pub seeds: Vec<SocketAddr>,
47    /// Cluster name for isolation
48    pub cluster_name: String,
49    /// Node metadata to propagate via gossip
50    pub metadata: Vec<(String, String)>,
51}
52
53impl Default for ClusterConfig {
54    fn default() -> Self {
55        Self {
56            bind_addr: "0.0.0.0:7000".parse().unwrap(),
57            seeds: Vec::new(),
58            cluster_name: "pollen".to_string(),
59            metadata: Vec::new(),
60        }
61    }
62}
63
64/// Executor configuration
65#[derive(Debug, Clone)]
66pub struct ExecutorSettings {
67    /// Maximum concurrent task executions
68    pub max_concurrent: usize,
69    /// Default timeout for task execution
70    pub default_timeout: Duration,
71}
72
73impl Default for ExecutorSettings {
74    fn default() -> Self {
75        Self {
76            max_concurrent: 100,
77            default_timeout: Duration::from_secs(300),
78        }
79    }
80}
81
82/// Builder for constructing a Pollen scheduler instance
83pub struct PollenBuilder {
84    storage: Storage,
85    cluster: Option<ClusterConfig>,
86    executor: ExecutorSettings,
87}
88
89impl PollenBuilder {
90    /// Create a new builder with default configuration
91    pub fn new() -> Self {
92        Self {
93            storage: Storage::Memory,
94            cluster: None,
95            executor: ExecutorSettings::default(),
96        }
97    }
98
99    /// Configure storage backend
100    pub fn storage(mut self, storage: Storage) -> Self {
101        self.storage = storage;
102        self
103    }
104
105    /// Configure cluster mode
106    pub fn cluster(mut self, config: ClusterConfig) -> Self {
107        self.cluster = Some(config);
108        self
109    }
110
111    /// Configure executor
112    pub fn executor(mut self, config: ExecutorSettings) -> Self {
113        self.executor = config;
114        self
115    }
116
117    /// Build the Pollen scheduler instance
118    pub async fn build(self) -> Result<Pollen> {
119        // Generate node ID
120        let node_id = NodeId::new();
121        info!("Starting Pollen with node ID: {:?}", node_id);
122
123        // 1. Initialize foundation layer
124
125        // Create HLC clock
126        let clock = pollen_clock::new_clock_with_id(node_id);
127
128        // Create storage backend
129        let store: Arc<StoreBackend> = match &self.storage {
130            Storage::Sqlite(path) => {
131                let config = StorageConfig {
132                    path: path.to_string_lossy().to_string(),
133                    ..Default::default()
134                };
135                let sqlite = SqliteStore::open(&config)?;
136                Arc::new(StoreBackend::Sqlite(sqlite))
137            }
138            Storage::Memory => Arc::new(StoreBackend::Memory(MemoryStore::new())),
139        };
140
141        // Create executor
142        let executor_config = pollen_executor::ExecutorConfig {
143            max_concurrent: self.executor.max_concurrent,
144            default_timeout: self.executor.default_timeout,
145        };
146        let executor: Arc<dyn Executor> = Arc::new(TokioExecutor::new(executor_config));
147
148        // 2. Initialize cluster layer (if configured)
149        #[allow(clippy::type_complexity)]
150        let (membership, router): (
151            Option<Arc<dyn Membership>>,
152            Option<Arc<dyn TaskRouter>>,
153        ) = if let Some(ref cluster) = self.cluster {
154            // Convert builder ClusterConfig to pollen_types::ClusterConfig
155            let types_config = pollen_types::ClusterConfig {
156                bind_addr: cluster.bind_addr,
157                seeds: cluster.seeds.clone(),
158                cluster_name: cluster.cluster_name.clone(),
159                metadata: cluster.metadata.iter().cloned().collect(),
160                ..Default::default()
161            };
162
163            // Create chitchat-based membership
164            let membership: Arc<dyn Membership> =
165                Arc::new(ChitchatMembership::new(types_config, node_id).await?);
166
167            // Create consistent hash router
168            let router: Arc<dyn TaskRouter> =
169                Arc::new(ConsistentHashRouter::new(node_id, Arc::clone(&membership)));
170
171            info!(
172                "Pollen cluster mode initialized at {}",
173                cluster.bind_addr
174            );
175
176            (Some(membership), Some(router))
177        } else {
178            (None, None)
179        };
180
181        // 3. Initialize coordination layer
182
183        // Create scheduler
184        let scheduler = Arc::new(DefaultScheduler::new(
185            clock.clone(),
186            Arc::clone(&store),
187            None, // CRDT store (future work)
188        ));
189
190        // Start scheduler background tasks (instance generation)
191        scheduler.clone().start();
192
193        let scheduler: SharedScheduler = scheduler;
194
195        // Create claimer
196        let claimer_config = ClaimerConfig::default();
197        let claimer = Arc::new(Claimer::new(
198            node_id,
199            Arc::clone(&store),
200            router.clone(),
201            Arc::clone(&executor),
202            Arc::clone(&scheduler),
203            membership.clone(),
204            claimer_config,
205        ));
206
207        // Create shutdown token
208        let shutdown = CancellationToken::new();
209
210        // 4. Start background tasks
211        let claimer_handle = claimer.clone();
212        let shutdown_clone = shutdown.clone();
213        tokio::spawn(async move {
214            claimer_handle.run(shutdown_clone).await;
215        });
216
217        let mode = if self.cluster.is_some() {
218            "cluster"
219        } else {
220            "single-node"
221        };
222        info!("Pollen scheduler initialized in {} mode", mode);
223
224        Ok(Pollen {
225            node_id,
226            _clock: clock,
227            _store: store,
228            _executor: executor,
229            scheduler,
230            claimer,
231            membership,
232            shutdown,
233            _storage_config: self.storage,
234            _cluster_config: self.cluster,
235        })
236    }
237}
238
239impl Default for PollenBuilder {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245/// Main Pollen scheduler instance
246pub struct Pollen {
247    node_id: NodeId,
248    _clock: SharedClock,
249    _store: Arc<StoreBackend>,
250    _executor: Arc<dyn Executor>,
251    scheduler: SharedScheduler,
252    #[allow(dead_code)] // Used for graceful shutdown and future cluster mode
253    claimer: Arc<Claimer>,
254    membership: Option<Arc<dyn Membership>>,
255    shutdown: CancellationToken,
256    _storage_config: Storage,
257    _cluster_config: Option<ClusterConfig>,
258}
259
260impl Pollen {
261    /// Create a new builder
262    pub fn builder() -> PollenBuilder {
263        PollenBuilder::new()
264    }
265
266    /// Get the local node ID
267    pub fn node_id(&self) -> NodeId {
268        self.node_id
269    }
270
271    /// Check if running in cluster mode
272    pub fn is_cluster_mode(&self) -> bool {
273        self._cluster_config.is_some()
274    }
275
276    /// Define a new task
277    pub fn define(&self, name: &str) -> TaskBuilder {
278        TaskBuilder::new(name.to_string(), Arc::clone(&self.scheduler))
279    }
280
281    /// Get a task by name
282    pub fn get_task(&self, name: &str) -> Option<TaskDef> {
283        self.scheduler.get_task_by_name(name)
284    }
285
286    /// Trigger immediate execution of a task
287    pub async fn trigger(&self, name: &str) -> Result<()> {
288        let task = self
289            .scheduler
290            .get_task_by_name(name)
291            .ok_or_else(|| pollen_types::PollenError::TaskNameNotFound(name.to_string()))?;
292        self.scheduler.trigger(&task.id, None).await?;
293        Ok(())
294    }
295
296    /// Trigger a task with payload
297    pub async fn trigger_with_payload(&self, name: &str, payload: bytes::Bytes) -> Result<()> {
298        let task = self
299            .scheduler
300            .get_task_by_name(name)
301            .ok_or_else(|| pollen_types::PollenError::TaskNameNotFound(name.to_string()))?;
302        self.scheduler.trigger(&task.id, Some(payload)).await?;
303        Ok(())
304    }
305
306    /// Get task information
307    pub fn task_info(&self, name: &str) -> Option<TaskInfo> {
308        self.scheduler.get_task_by_name(name).map(|task| TaskInfo {
309            id: task.id.clone(),
310            name: task.name.clone(),
311            enabled: task.enabled,
312            last_run: None, // TODO: Query from store
313            next_run: None, // TODO: Compute from schedule
314        })
315    }
316
317    /// List all registered tasks
318    pub fn list_tasks(&self) -> Vec<TaskDef> {
319        self.scheduler.list_tasks()
320    }
321
322    /// Get cluster state (if in cluster mode)
323    pub fn cluster_state(&self) -> Option<ClusterState> {
324        if let Some(ref membership) = self.membership {
325            let members = membership.alive_members();
326            Some(ClusterState {
327                local_node: self.node_id,
328                nodes: members
329                    .into_iter()
330                    .map(|m| NodeInfo {
331                        id: m.id,
332                        addr: m.addr,
333                        state: match m.state {
334                            pollen_types::MemberState::Alive => NodeState::Alive,
335                            pollen_types::MemberState::Suspect => NodeState::Suspect,
336                            pollen_types::MemberState::Dead | pollen_types::MemberState::Left => {
337                                NodeState::Dead
338                            }
339                        },
340                    })
341                    .collect(),
342            })
343        } else {
344            None
345        }
346    }
347
348    /// Get the number of nodes in the cluster
349    pub fn cluster_size(&self) -> usize {
350        self.membership
351            .as_ref()
352            .map(|m| m.alive_members().len())
353            .unwrap_or(1)
354    }
355
356    /// Shutdown the scheduler gracefully
357    pub async fn shutdown(self) {
358        info!("Shutting down Pollen scheduler...");
359        self.shutdown.cancel();
360
361        // Shutdown membership if in cluster mode
362        if let Some(membership) = &self.membership {
363            membership.shutdown().await;
364        }
365
366        // Give background tasks time to complete
367        tokio::time::sleep(Duration::from_millis(100)).await;
368        info!("Pollen scheduler shutdown complete");
369    }
370}
371
372/// Task builder for fluent task definition
373pub struct TaskBuilder {
374    name: String,
375    scheduler: SharedScheduler,
376    schedule: Option<Schedule>,
377    handler: Option<Arc<dyn pollen_executor::TaskHandler>>,
378    retry_policy: Option<pollen_types::RetryPolicy>,
379    timeout: Option<Duration>,
380}
381
382impl TaskBuilder {
383    fn new(name: String, scheduler: SharedScheduler) -> Self {
384        Self {
385            name,
386            scheduler,
387            schedule: None,
388            handler: None,
389            retry_policy: None,
390            timeout: None,
391        }
392    }
393
394    /// Set the task schedule
395    pub fn schedule(mut self, schedule: Schedule) -> Self {
396        self.schedule = Some(schedule);
397        self
398    }
399
400    /// Set the task handler
401    pub fn handler<F, Fut>(mut self, handler: F) -> Self
402    where
403        F: Fn() -> Fut + Send + Sync + 'static,
404        Fut: std::future::Future<Output = Result<()>> + Send + 'static,
405    {
406        self.handler = Some(pollen_executor::simple_handler(handler));
407        self
408    }
409
410    /// Set retry policy
411    pub fn retry(mut self, policy: pollen_types::RetryPolicy) -> Self {
412        self.retry_policy = Some(policy);
413        self
414    }
415
416    /// Set timeout
417    pub fn timeout(mut self, timeout: Duration) -> Self {
418        self.timeout = Some(timeout);
419        self
420    }
421
422    /// Register the task
423    pub async fn register(self) -> Result<()> {
424        let schedule = self.schedule.ok_or_else(|| {
425            pollen_types::PollenError::Config("Task schedule is required".to_string())
426        })?;
427
428        let handler = self.handler.ok_or_else(|| {
429            pollen_types::PollenError::Config("Task handler is required".to_string())
430        })?;
431
432        let mut task = TaskDef::new(&self.name, schedule);
433
434        if let Some(retry) = self.retry_policy {
435            task = task.with_retry(retry);
436        }
437
438        if let Some(timeout) = self.timeout {
439            task = task.with_timeout(timeout);
440        }
441
442        self.scheduler.register(task, handler).await
443    }
444}
445
446/// Task information
447#[derive(Debug, Clone)]
448pub struct TaskInfo {
449    pub id: pollen_types::TaskId,
450    pub name: String,
451    pub enabled: bool,
452    pub last_run: Option<chrono::DateTime<chrono::Utc>>,
453    pub next_run: Option<chrono::DateTime<chrono::Utc>>,
454}
455
456/// Cluster state information
457#[derive(Debug, Clone)]
458pub struct ClusterState {
459    pub local_node: pollen_types::NodeId,
460    pub nodes: Vec<NodeInfo>,
461}
462
463/// Node information in cluster
464#[derive(Debug, Clone)]
465pub struct NodeInfo {
466    pub id: pollen_types::NodeId,
467    pub addr: SocketAddr,
468    pub state: NodeState,
469}
470
471/// Node state in cluster
472#[derive(Debug, Clone, Copy, PartialEq, Eq)]
473pub enum NodeState {
474    Alive,
475    Suspect,
476    Dead,
477}