1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use std::sync::Arc;

use crate::mem_size::MemSize;
use crate::pipeline::name::ColumnName;
use crate::pipeline::pipeline_graph::PipelineGraph;
use crate::pipeline::stream_model::StreamModel;
use crate::stream_engine::autonomous_executor::performance_metrics::metrics_update_command::metrics_update_by_task_execution::OutQueueMetricsUpdateByTask;
use crate::stream_engine::autonomous_executor::row::Row;
use crate::stream_engine::autonomous_executor::task::task_context::TaskContext;
use crate::stream_engine::autonomous_executor::task_graph::queue_id::QueueId;
use crate::stream_engine::command::insert_plan::InsertPlan;

use super::query_subtask::SqlValues;

#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct InsertSubtask {
    into_stream: Arc<StreamModel>,

    /// INSERT INTO stream (c2, c3, c1) -- this one!
    column_order: Vec<ColumnName>,
}

#[derive(Debug, new)]
pub(in crate::stream_engine::autonomous_executor) struct InsertSubtaskOut {
    pub(in crate::stream_engine::autonomous_executor) out_queues_metrics_update:
        Vec<OutQueueMetricsUpdateByTask>,
}

impl InsertSubtask {
    /// # Panics
    ///
    /// `plan` has invalid stream name
    pub(in crate::stream_engine::autonomous_executor) fn new(
        plan: &InsertPlan,
        pipeline_graph: &PipelineGraph,
    ) -> Self {
        let into_stream = pipeline_graph
            .get_stream(plan.stream())
            .expect("plan has invalid stream name");
        Self {
            into_stream,
            column_order: plan.column_order().to_vec(),
        }
    }

    pub(in crate::stream_engine::autonomous_executor) fn run(
        &self,
        values_seq: Vec<SqlValues>,
        context: &TaskContext,
    ) -> InsertSubtaskOut {
        if values_seq.is_empty() {
            InsertSubtaskOut::new(vec![])
        } else {
            let repos = context.repos();
            let row_q_repo = repos.row_queue_repository();
            let window_q_repo = repos.window_queue_repository();
            let output_queues = context.output_queues();

            let rows = values_seq
                .into_iter()
                .map(|values| values.into_row(self.into_stream.clone(), self.column_order.clone()))
                .collect::<Vec<_>>();

            let out_queues_metrics_update = output_queues
                .into_iter()
                .map(|q| match q {
                    QueueId::Row(queue_id) => {
                        let row_q = row_q_repo.get(&queue_id);
                        let out = self.out_queue_metrics_update(queue_id.into(), &rows);
                        for row in rows.clone() {
                            row_q.put(row);
                        }
                        out
                    }
                    QueueId::Window(queue_id) => {
                        let window_queue = window_q_repo.get(&queue_id);
                        let out = self.out_queue_metrics_update(queue_id.into(), &rows);
                        for row in rows.clone() {
                            window_queue.put(row);
                        }
                        out
                    }
                })
                .collect();

            InsertSubtaskOut::new(out_queues_metrics_update)
        }
    }

    fn out_queue_metrics_update(
        &self,
        queue_id: QueueId,
        rows: &[Row],
    ) -> OutQueueMetricsUpdateByTask {
        let bytes_put: usize = rows.iter().map(|row| row.mem_size()).sum();
        OutQueueMetricsUpdateByTask::new(queue_id, rows.len() as u64, bytes_put as u64)
    }
}