assemblyline_models/messages/
dispatching.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::types::{Sha256, Sid};
6
7
8
9#[derive(Serialize, Deserialize, Clone, Debug)]
10pub struct SubmissionDispatchMessage {
11    pub submission: crate::datastore::Submission,
12    #[serde(default)]
13    pub completed_queue: Option<String>,
14    #[serde(default)]
15    pub file_infos: HashMap<Sha256, super::task::FileInfo>,
16    #[serde(default)]
17    pub results: HashMap<String, crate::datastore::result::Result>,
18    #[serde(default)]
19    pub file_tree: HashMap<Sha256, FileTreeData>,
20    #[serde(default)]
21    pub errors: Vec<String>,
22}
23
24impl SubmissionDispatchMessage {
25    pub fn new(submission: crate::datastore::Submission, completed_queue: Option<String>) -> Self {
26        Self {
27            submission,
28            completed_queue,
29            file_infos: Default::default(),
30            file_tree: Default::default(),
31            errors: Default::default(),
32            results: Default::default(),
33        }
34    }
35
36    pub fn set_file_infos(mut self, file_infos: HashMap<Sha256, super::task::FileInfo>) -> Self {
37        self.file_infos = file_infos.clone();
38        self
39    }
40
41    pub fn set_file_tree(mut self, file_tree: HashMap<Sha256, FileTreeData>) -> Self {
42        self.file_tree = file_tree;
43        self
44    }
45
46
47    pub fn set_results(mut self, results: HashMap<String, crate::datastore::result::Result>) -> Self {
48        self.results = results;
49        self
50    }
51
52    pub fn set_errors(mut self, errors:  Vec<String>) -> Self {
53        self.errors = errors;
54        self
55    }
56
57
58}
59
60#[derive(Serialize, Deserialize, Default, Clone, Debug)]
61#[serde(default)]
62pub struct FileTreeData {
63    pub name: Vec<String>,
64    pub children: HashMap<Sha256, FileTreeData>
65}
66
67#[derive(Serialize, Deserialize)]
68#[serde(rename_all="UPPERCASE")]
69pub enum WatchQueueStatus {
70    Fail,
71    Ok,
72    Start,
73    Stop
74}
75
76
77/// These are messages sent by dispatcher on the watch queue
78#[derive(Serialize, Deserialize)]
79pub struct WatchQueueMessage {
80    /// Cache key
81    pub cache_key: Option<String>,
82    /// Watch statuses
83    pub status: WatchQueueStatus
84}
85
86impl WatchQueueMessage {
87    pub fn stop() -> Self {
88        Self {
89            cache_key: None,
90            status: WatchQueueStatus::Stop,
91        }
92    }
93
94    pub fn fail(cache_key: String) -> Self {
95        Self {
96            cache_key: Some(cache_key),
97            status: WatchQueueStatus::Fail,
98        }
99    }
100
101    pub fn ok(cache_key: String) -> Self {
102        Self {
103            cache_key: Some(cache_key),
104            status: WatchQueueStatus::Ok,
105        }
106    }
107}
108
109impl From<WatchQueueStatus> for WatchQueueMessage {
110    fn from(value: WatchQueueStatus) -> Self {
111        Self {
112            cache_key: None,
113            status: value
114        }
115    }
116}
117
118
119#[derive(Serialize, Deserialize)]
120#[serde(rename_all="SCREAMING_SNAKE_CASE")]
121pub enum MessageClasses {
122    CreateWatch,
123    ListOutstanding,
124    UpdateBadSid
125}
126
127
128/// Create Watch Message
129#[derive(Serialize, Deserialize)]
130pub struct CreateWatch {
131    /// Name of queue
132    pub queue_name: String,
133    /// Submission ID
134    pub submission: Sid,
135}
136
137/// List Outstanding Message
138#[derive(Serialize, Deserialize)]
139pub struct ListOutstanding {
140    /// Response queue
141    pub response_queue: String,
142    /// Submission ID
143    pub submission: Sid,
144}
145
146/// Model of Dispatcher Command Message
147#[derive(Serialize, Deserialize)]
148pub struct DispatcherCommandMessage {
149    /// Kind of message
150    kind: MessageClasses,
151    /// Message payload
152    payload_data: serde_json::Value,
153}
154
155impl DispatcherCommandMessage {
156    pub fn payload(&self) -> Result<DispatcherCommand, serde_json::Error> {
157        Ok(match self.kind {
158            MessageClasses::CreateWatch => DispatcherCommand::CreateWatch(serde_json::from_value(self.payload_data.clone())?),
159            MessageClasses::ListOutstanding => DispatcherCommand::ListOutstanding(serde_json::from_value(self.payload_data.clone())?),
160            MessageClasses::UpdateBadSid => DispatcherCommand::UpdateBadSid(serde_json::from_value(self.payload_data.clone())?),
161        })
162    }
163}
164
165
166/// Model of Dispatcher Command Message
167#[derive(Serialize, Deserialize)]
168pub enum DispatcherCommand {
169    /// Create Watch Message
170    CreateWatch(CreateWatch),
171
172    /// List Outstanding Message
173    ListOutstanding(ListOutstanding),
174
175    /// let the dispatcher know that the bad sid list has changed
176    UpdateBadSid(String),
177}