1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use std::sync::Arc;
use crate::mem_size::MemSize;
use crate::pipeline::name::ColumnName;
use crate::pipeline::pipeline_graph::PipelineGraph;
use crate::pipeline::stream_model::StreamModel;
use crate::stream_engine::autonomous_executor::performance_metrics::metrics_update_command::metrics_update_by_task_execution::OutQueueMetricsUpdateByTask;
use crate::stream_engine::autonomous_executor::row::Row;
use crate::stream_engine::autonomous_executor::task::task_context::TaskContext;
use crate::stream_engine::autonomous_executor::task_graph::queue_id::QueueId;
use crate::stream_engine::command::insert_plan::InsertPlan;
use super::query_subtask::SqlValues;
#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct InsertSubtask {
into_stream: Arc<StreamModel>,
column_order: Vec<ColumnName>,
}
#[derive(Debug, new)]
pub(in crate::stream_engine::autonomous_executor) struct InsertSubtaskOut {
pub(in crate::stream_engine::autonomous_executor) out_queues_metrics_update:
Vec<OutQueueMetricsUpdateByTask>,
}
impl InsertSubtask {
pub(in crate::stream_engine::autonomous_executor) fn new(
plan: &InsertPlan,
pipeline_graph: &PipelineGraph,
) -> Self {
let into_stream = pipeline_graph
.get_stream(plan.stream())
.expect("plan has invalid stream name");
Self {
into_stream,
column_order: plan.column_order().to_vec(),
}
}
pub(in crate::stream_engine::autonomous_executor) fn run(
&self,
values_seq: Vec<SqlValues>,
context: &TaskContext,
) -> InsertSubtaskOut {
if values_seq.is_empty() {
InsertSubtaskOut::new(vec![])
} else {
let repos = context.repos();
let row_q_repo = repos.row_queue_repository();
let window_q_repo = repos.window_queue_repository();
let output_queues = context.output_queues();
let rows = values_seq
.into_iter()
.map(|values| values.into_row(self.into_stream.clone(), self.column_order.clone()))
.collect::<Vec<_>>();
let out_queues_metrics_update = output_queues
.into_iter()
.map(|q| match q {
QueueId::Row(queue_id) => {
let row_q = row_q_repo.get(&queue_id);
let out = self.out_queue_metrics_update(queue_id.into(), &rows);
for row in rows.clone() {
row_q.put(row);
}
out
}
QueueId::Window(queue_id) => {
let window_queue = window_q_repo.get(&queue_id);
let out = self.out_queue_metrics_update(queue_id.into(), &rows);
for row in rows.clone() {
window_queue.put(row);
}
out
}
})
.collect();
InsertSubtaskOut::new(out_queues_metrics_update)
}
}
fn out_queue_metrics_update(
&self,
queue_id: QueueId,
rows: &[Row],
) -> OutQueueMetricsUpdateByTask {
let bytes_put: usize = rows.iter().map(|row| row.mem_size()).sum();
OutQueueMetricsUpdateByTask::new(queue_id, rows.len() as u64, bytes_put as u64)
}
}