1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use std::{sync::Arc, thread, time::Duration};
use crate::stream_engine::autonomous_executor::{
event_queue::{
event::{Event, EventTag},
EventQueue,
},
memory_state_machine::{
MemoryStateMachine, MemoryStateMachineThreshold, MemoryStateTransition,
},
performance_metrics::{
metrics_update_command::metrics_update_by_task_execution::MetricsUpdateByTaskExecutionOrPurge,
performance_metrics_summary::PerformanceMetricsSummary, PerformanceMetrics,
},
pipeline_derivatives::PipelineDerivatives,
worker::worker_thread::{WorkerThread, WorkerThreadLoopState},
};
#[derive(Debug)]
pub(super) struct MemoryStateMachineWorkerThread;
#[derive(Debug, new)]
pub(in crate::stream_engine::autonomous_executor) struct MemoryStateMachineWorkerThreadArg {
threshold: MemoryStateMachineThreshold,
memory_state_transition_interval_msec: u32,
}
#[derive(Debug)]
pub(super) struct MemoryStateMachineWorkerLoopState {
memory_state_machine: MemoryStateMachine,
}
impl WorkerThreadLoopState for MemoryStateMachineWorkerLoopState {
type ThreadArg = MemoryStateMachineWorkerThreadArg;
fn new(thread_arg: &Self::ThreadArg) -> Self
where
Self: Sized,
{
let memory_state_machine = MemoryStateMachine::new(thread_arg.threshold);
Self {
memory_state_machine,
}
}
fn is_integral(&self) -> bool {
true
}
}
impl WorkerThread for MemoryStateMachineWorkerThread {
const THREAD_NAME: &'static str = "MemoryStateMachineWorker";
type ThreadArg = MemoryStateMachineWorkerThreadArg;
type LoopState = MemoryStateMachineWorkerLoopState;
fn event_subscription() -> Vec<EventTag> {
vec![EventTag::ReportMetricsSummary]
}
fn main_loop_cycle(
current_state: Self::LoopState,
thread_arg: &Self::ThreadArg,
_event_queue: &EventQueue,
) -> Self::LoopState {
thread::sleep(Duration::from_millis(
thread_arg.memory_state_transition_interval_msec as u64,
));
current_state
}
fn ev_update_pipeline(
_current_state: Self::LoopState,
_pipeline_derivatives: Arc<PipelineDerivatives>,
_thread_arg: &Self::ThreadArg,
_event_queue: Arc<EventQueue>,
) -> Self::LoopState {
unreachable!()
}
fn ev_replace_performance_metrics(
_current_state: Self::LoopState,
_metrics: Arc<PerformanceMetrics>,
_thread_arg: &Self::ThreadArg,
_event_queue: Arc<EventQueue>,
) -> Self::LoopState {
unreachable!()
}
fn ev_incremental_update_metrics(
_current_state: Self::LoopState,
_metrics: Arc<MetricsUpdateByTaskExecutionOrPurge>,
_thread_arg: &Self::ThreadArg,
_event_queue: Arc<EventQueue>,
) -> Self::LoopState {
unreachable!()
}
fn ev_report_metrics_summary(
current_state: Self::LoopState,
metrics_summary: Arc<PerformanceMetricsSummary>,
_thread_arg: &Self::ThreadArg,
event_queue: Arc<EventQueue>,
) -> Self::LoopState {
let mut state = current_state;
let bytes = metrics_summary.queue_total_bytes;
if let Some(transition) = state.memory_state_machine.update_memory_usage(bytes) {
log::warn!(
"[MemoryStateMachineWorker] Memory state transition: {:?}",
transition
);
event_queue.publish(Event::TransitMemoryState {
memory_state_transition: Arc::new(transition),
})
}
state
}
fn ev_transit_memory_state(
_current_state: Self::LoopState,
_memory_state_transition: Arc<MemoryStateTransition>,
_thread_arg: &Self::ThreadArg,
_event_queue: Arc<EventQueue>,
) -> Self::LoopState {
unreachable!()
}
}