springql_core/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/
join_subtask.rs

1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3use std::sync::{Mutex, MutexGuard};
4
5use crate::{
6    expr_resolver::ExprResolver,
7    pipeline::{JoinParameter, WindowParameter},
8    stream_engine::autonomous_executor::{
9        performance_metrics::WindowInFlowByWindowTask,
10        task::{
11            tuple::Tuple,
12            window::{JoinDir, JoinWindow, Window},
13        },
14    },
15};
16
17#[derive(Debug)]
18pub struct JoinSubtask(Mutex<JoinWindow>);
19
20impl JoinSubtask {
21    pub fn new(window_param: WindowParameter, join_param: JoinParameter) -> Self {
22        let window = JoinWindow::new(window_param, join_param);
23        Self(Mutex::new(window))
24    }
25
26    pub fn run(
27        &self,
28        expr_resolver: &ExprResolver,
29        tuple: Tuple,
30        dir: JoinDir,
31    ) -> (Vec<Tuple>, WindowInFlowByWindowTask) {
32        self.0
33            .lock()
34            .expect("another thread accessing to window gets poisoned")
35            .dispatch(expr_resolver, tuple, dir)
36            .expect("dispatch failed")
37    }
38
39    pub fn get_window_mut(&self) -> MutexGuard<JoinWindow> {
40        self.0
41            .lock()
42            .expect("another thread accessing to window gets poisoned")
43    }
44}