assemblyline_models/messages/
ingest_heartbeat.rs

1use serde::{Deserialize, Serialize};
2
3// from assemblyline import odm
4// from assemblyline.odm.messages import PerformanceTimer
5
6// MSG_TYPES = {"IngestHeartbeat"}
7// LOADER_CLASS = "assemblyline.odm.messages.ingest_heartbeat.IngestMessage"
8
9
10// @odm.model(description="Queues")
11// class Queues(odm.Model):
12//     critical = odm.Integer(description="Size of the critical priority queue")
13//     high = odm.Integer(description="Size of the high priority queue")
14//     ingest = odm.Integer(description="Size of the ingest queue")
15//     complete = odm.Integer(description="Size of the complete queue")
16//     low = odm.Integer(description="Size of the low priority queue")
17//     medium = odm.Integer(description="Size of the medium priority queue")
18
19
20
21/// Metrics
22#[derive(Serialize, Deserialize, Default)]
23pub struct Metrics {
24    /// Number of cache misses
25    pub cache_miss: u32,
26    /// Number of cache expires
27    pub cache_expired: u32,
28    /// Number of cache stales
29    pub cache_stale: u32,
30    /// Number of cache local hits
31    pub cache_hit_local: u32,
32    /// Number of cache hits
33    pub cache_hit: u32,
34    /// Number of bytes completed
35    pub bytes_completed: u64,
36    /// Number of bytes ingested
37    pub bytes_ingested: u64,
38    /// Number of duplicate submissions
39    pub duplicates: u32,
40    /// Number of errors
41    pub error: u32,
42    /// Number of completed files
43    pub files_completed: u32,
44    /// Number of skipped files
45    pub skipped: u32,
46    /// Number of completed submissions
47    pub submissions_completed: u32,
48    /// Number of ingested submissions
49    pub submissions_ingested: u32,
50    /// Number of timed_out submissions
51    pub timed_out: u32,
52    /// Number of safelisted submissions
53    pub whitelisted: u32,
54    /// Number of retried submissions
55    pub retries: u32,
56
57    /// Counter to track used cpu time
58    #[serde(flatten)]
59    pub cpu_seconds: CPUSeconds,
60
61    // /// Used on metrics output to represent part cpu_seconds by the python module.
62    // pub cpu_seconds_count: i32,
63
64    /// Depricated
65    #[serde(flatten)]
66    pub busy_seconds: BusySeconds,
67    // pub busy_seconds_count: i32,
68}
69
70#[derive(Serialize, Deserialize, Default)]
71pub struct CPUSeconds {
72    #[serde(rename="cpu_seconds.c")]
73    pub count: i32,
74    #[serde(rename="cpu_seconds.t")]
75    pub total: f64,
76}
77
78impl CPUSeconds {
79    pub fn increment(&mut self, time: f64) {
80        self.count += 1;
81        self.total += time;
82    }
83}
84
85
86#[derive(Serialize, Deserialize, Default)]
87pub struct BusySeconds {
88    #[serde(rename="busy_seconds.c")]
89    pub count: i32,
90    #[serde(rename="busy_seconds.t")]
91    pub total: f64,
92}
93
94
95// @odm.model(description="Processing")
96// class Processing(odm.Model):
97//     inflight = odm.Integer(description="Number of inflight submissions")
98
99
100
101// @odm.model(description="Chance of Processing")
102// class ProcessingChance(odm.Model):
103//     critical = odm.Float(description="Chance of processing critical items")
104//     high = odm.Float(description="Chance of processing high items")
105//     low = odm.Float(description="Chance of processing low items")
106//     medium = odm.Float(description="Chance of processing medium items")
107
108
109// @odm.model(description="Heartbeat Model")
110// class Heartbeat(odm.Model):
111//     instances = odm.Integer(description="Number of ingest processes")
112//     metrics = odm.Compound(Metrics, description="Metrics")
113//     processing = odm.Compound(Processing, description="Inflight queue sizes")
114//     processing_chance = odm.Compound(ProcessingChance, description="Chance of processing items")
115//     queues = odm.Compound(Queues, description="Queue lengths block")
116
117
118// @odm.model(description="Model of Ingester Heartbeat Message")
119// class IngestMessage(odm.Model):
120//     msg = odm.Compound(Heartbeat, description="Heartbeat message")
121//     msg_loader = odm.Enum(values={LOADER_CLASS}, default=LOADER_CLASS, description="Loader class for message")
122//     msg_type = odm.Enum(values=MSG_TYPES, default="IngestHeartbeat", description="Type of message")
123//     sender = odm.Keyword(description="Sender of message")