assemblyline_models/messages/
dispatcher_heartbeat.rs

1use serde::{Deserialize, Serialize};
2
3// from assemblyline import odm
4// from assemblyline.odm.messages import PerformanceTimer
5
6// MSG_TYPES = {"DispatcherHeartbeat"}
7// LOADER_CLASS = "assemblyline.odm.messages.dispatcher_heartbeat.DispatcherMessage"
8
9
10// @odm.model(description="Queue Model")
11// class Queues(odm.Model):
12//     ingest = odm.Integer(description="Number of submissions in ingest queue")
13//     start = odm.List(odm.Integer(), description="Number of submissions that started")
14//     result = odm.List(odm.Integer(), description="Number of results in queue")
15//     command = odm.List(odm.Integer(), description="Number of commands in queue")
16
17
18// @odm.model(description="Inflight Model")
19// class Inflight(odm.Model):
20//     max = odm.Integer(description="Maximum number of submissions")
21//     outstanding = odm.Integer(description="Number of outstanding submissions")
22//     per_instance = odm.List(odm.Integer(), description="Number of submissions per Dispatcher instance")
23
24
25/// Metrics Model
26#[derive(Debug, Default, Serialize, Deserialize)]
27pub struct Metrics {
28    /// Number of files completed
29    pub files_completed: i32,
30    /// Number of submissions completed
31    pub submissions_completed: i32,
32    /// Number of service timeouts
33    pub service_timeouts: i32,
34    /// CPU time
35    #[serde(flatten)]
36    pub cpu_seconds: CPUSeconds,
37    /// CPU count
38    // pub cpu_seconds_count: i32,
39    /// Busy CPU time
40    #[serde(flatten)]
41    pub busy_seconds: BusySeconds,
42    // pub busy_seconds_count: i32,
43    /// Processed submissions waiting to be saved
44    pub save_queue: i32,
45    /// Errors waiting to be saved
46    pub error_queue: i32,
47}
48
49
50#[derive(Serialize, Deserialize, Default, Debug)]
51pub struct CPUSeconds {
52    #[serde(rename="cpu_seconds.c")]
53    pub count: i32,
54    #[serde(rename="cpu_seconds.t")]
55    pub total: f64,
56}
57
58impl CPUSeconds {
59    pub fn increment(&mut self, time: f64) {
60        self.count += 1;
61        self.total += time;
62    }
63}
64
65
66#[derive(Serialize, Deserialize, Default, Debug)]
67pub struct BusySeconds {
68    #[serde(rename="busy_seconds.c")]
69    pub count: i32,
70    #[serde(rename="busy_seconds.t")]
71    pub total: f64,
72}
73
74
75// @odm.model(description="Heartbeat Model")
76// class Heartbeat(odm.Model):
77//     inflight = odm.Compound(Inflight, description="Inflight submissions")
78//     instances = odm.Integer(description="Number of instances")
79//     metrics = odm.Compound(Metrics, description="Dispatcher metrics")
80//     queues = odm.Compound(Queues, description="Dispatcher queues")
81//     component = odm.Keyword(description="Component name")
82
83
84// @odm.model(description="Model of Dispatcher Heartbeat Messages")
85// class DispatcherMessage(odm.Model):
86//     msg = odm.Compound(Heartbeat, description="Heartbeat message")
87//     msg_loader = odm.Enum(values={LOADER_CLASS}, default=LOADER_CLASS, description="Loader class for message")
88//     msg_type = odm.Enum(values=MSG_TYPES, default="DispatcherHeartbeat", description="Type of message")
89//     sender = odm.Keyword(description="Sender of message")