springql_core/stream_engine/autonomous_executor/task_executor/
source_worker_pool.rs

1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3mod source_worker;
4
5use std::{cell::RefCell, sync::Arc};
6
7use crate::{
8    api::SpringWorkerConfig,
9    stream_engine::autonomous_executor::{
10        args::{Coordinators, EventQueues, Locks},
11        repositories::Repositories,
12        task_executor::{
13            source_worker_pool::source_worker::SourceWorker,
14            task_worker_thread_handler::{TaskWorkerId, TaskWorkerThreadArg},
15        },
16    },
17};
18
19/// Workers to execute pump and sink tasks.
20#[derive(Debug)]
21pub struct SourceWorkerPool {
22    /// Worker pool gets interruption from task executor on, for example, pipeline update.
23    /// Since worker pool holder cannot always be mutable, worker pool is better to have mutability for each worker.
24    ///
25    /// Mutation to workers only happens inside task executor lock like `PipelineUpdateLockGuard`,
26    /// so here uses RefCell instead of Mutex nor RwLock to avoid lock cost to workers.
27    _workers: RefCell<Vec<SourceWorker>>,
28}
29
30impl SourceWorkerPool {
31    pub fn new(
32        config: &SpringWorkerConfig,
33        locks: Locks,
34        event_queues: EventQueues,
35        coordinators: Coordinators,
36        repos: Arc<Repositories>,
37    ) -> Self {
38        let workers = (0..config.n_source_worker_threads)
39            .map(|id| {
40                let arg = TaskWorkerThreadArg::new(
41                    TaskWorkerId::new(id as u16),
42                    locks.task_executor_lock.clone(),
43                    repos.clone(),
44                    config.sleep_msec_no_row,
45                );
46                SourceWorker::new(
47                    locks.main_job_lock.clone(),
48                    event_queues.clone(),
49                    coordinators.clone(),
50                    arg,
51                )
52            })
53            .collect();
54        Self {
55            _workers: RefCell::new(workers),
56        }
57    }
58}