1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) 2022 TOYOTA MOTOR CORPORATION. Licensed under MIT OR Apache-2.0.

use std::{sync::Arc, thread, time::Duration};

use crate::stream_engine::autonomous_executor::{
    event_queue::{
        event::{Event, EventTag},
        EventQueue,
    },
    memory_state_machine::{
        MemoryStateMachine, MemoryStateMachineThreshold, MemoryStateTransition,
    },
    performance_metrics::{
        metrics_update_command::metrics_update_by_task_execution::MetricsUpdateByTaskExecutionOrPurge,
        performance_metrics_summary::PerformanceMetricsSummary, PerformanceMetrics,
    },
    pipeline_derivatives::PipelineDerivatives,
    worker::worker_thread::{WorkerThread, WorkerThreadLoopState},
};

/// Runs a worker thread.
#[derive(Debug)]
pub(super) struct MemoryStateMachineWorkerThread;

#[derive(Debug, new)]
pub(in crate::stream_engine::autonomous_executor) struct MemoryStateMachineWorkerThreadArg {
    threshold: MemoryStateMachineThreshold,
    memory_state_transition_interval_msec: u32,
}

#[derive(Debug)]
pub(super) struct MemoryStateMachineWorkerLoopState {
    memory_state_machine: MemoryStateMachine,
}

impl WorkerThreadLoopState for MemoryStateMachineWorkerLoopState {
    type ThreadArg = MemoryStateMachineWorkerThreadArg;

    fn new(thread_arg: &Self::ThreadArg) -> Self
    where
        Self: Sized,
    {
        let memory_state_machine = MemoryStateMachine::new(thread_arg.threshold);
        Self {
            memory_state_machine,
        }
    }

    fn is_integral(&self) -> bool {
        true
    }
}

impl WorkerThread for MemoryStateMachineWorkerThread {
    const THREAD_NAME: &'static str = "MemoryStateMachineWorker";

    type ThreadArg = MemoryStateMachineWorkerThreadArg;

    type LoopState = MemoryStateMachineWorkerLoopState;

    fn event_subscription() -> Vec<EventTag> {
        vec![EventTag::ReportMetricsSummary]
    }

    fn main_loop_cycle(
        current_state: Self::LoopState,
        thread_arg: &Self::ThreadArg,
        _event_queue: &EventQueue,
    ) -> Self::LoopState {
        // Do nothing in loop. Only curious about ReportMetricsSummary event.
        thread::sleep(Duration::from_millis(
            thread_arg.memory_state_transition_interval_msec as u64,
        ));
        current_state
    }

    fn ev_update_pipeline(
        _current_state: Self::LoopState,
        _pipeline_derivatives: Arc<PipelineDerivatives>,
        _thread_arg: &Self::ThreadArg,
        _event_queue: Arc<EventQueue>,
    ) -> Self::LoopState {
        unreachable!()
    }

    fn ev_replace_performance_metrics(
        _current_state: Self::LoopState,
        _metrics: Arc<PerformanceMetrics>,
        _thread_arg: &Self::ThreadArg,
        _event_queue: Arc<EventQueue>,
    ) -> Self::LoopState {
        unreachable!()
    }

    fn ev_incremental_update_metrics(
        _current_state: Self::LoopState,
        _metrics: Arc<MetricsUpdateByTaskExecutionOrPurge>,
        _thread_arg: &Self::ThreadArg,
        _event_queue: Arc<EventQueue>,
    ) -> Self::LoopState {
        unreachable!()
    }

    fn ev_report_metrics_summary(
        current_state: Self::LoopState,
        metrics_summary: Arc<PerformanceMetricsSummary>,
        _thread_arg: &Self::ThreadArg,
        event_queue: Arc<EventQueue>,
    ) -> Self::LoopState {
        let mut state = current_state;

        let bytes = metrics_summary.queue_total_bytes;
        if let Some(transition) = state.memory_state_machine.update_memory_usage(bytes) {
            log::warn!(
                "[MemoryStateMachineWorker] Memory state transition: {:?}",
                transition
            );
            event_queue.publish(Event::TransitMemoryState {
                memory_state_transition: Arc::new(transition),
            })
        }

        state
    }

    fn ev_transit_memory_state(
        _current_state: Self::LoopState,
        _memory_state_transition: Arc<MemoryStateTransition>,
        _thread_arg: &Self::ThreadArg,
        _event_queue: Arc<EventQueue>,
    ) -> Self::LoopState {
        unreachable!()
    }
}