springql_core/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/
join_subtask.rs1use std::sync::{Mutex, MutexGuard};
4
5use crate::{
6 expr_resolver::ExprResolver,
7 pipeline::{JoinParameter, WindowParameter},
8 stream_engine::autonomous_executor::{
9 performance_metrics::WindowInFlowByWindowTask,
10 task::{
11 tuple::Tuple,
12 window::{JoinDir, JoinWindow, Window},
13 },
14 },
15};
16
17#[derive(Debug)]
18pub struct JoinSubtask(Mutex<JoinWindow>);
19
20impl JoinSubtask {
21 pub fn new(window_param: WindowParameter, join_param: JoinParameter) -> Self {
22 let window = JoinWindow::new(window_param, join_param);
23 Self(Mutex::new(window))
24 }
25
26 pub fn run(
27 &self,
28 expr_resolver: &ExprResolver,
29 tuple: Tuple,
30 dir: JoinDir,
31 ) -> (Vec<Tuple>, WindowInFlowByWindowTask) {
32 self.0
33 .lock()
34 .expect("another thread accessing to window gets poisoned")
35 .dispatch(expr_resolver, tuple, dir)
36 .expect("dispatch failed")
37 }
38
39 pub fn get_window_mut(&self) -> MutexGuard<JoinWindow> {
40 self.0
41 .lock()
42 .expect("another thread accessing to window gets poisoned")
43 }
44}