assemblyline_models/messages/
dispatching.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::{Sha256, Sid};
6
7
8
9#[derive(Serialize, Deserialize, Clone)]
10pub struct SubmissionDispatchMessage {
11    pub submission: crate::datastore::Submission,
12    #[serde(default)]
13    pub completed_queue: Option<String>,
14    #[serde(default)]
15    pub file_infos: HashMap<Sha256, super::task::FileInfo>,
16    #[serde(default)]
17    pub results: HashMap<String, crate::datastore::result::Result>,
18    #[serde(default)]
19    pub file_tree: HashMap<Sha256, FileTreeData>,
20    #[serde(default)]
21    pub errors: Vec<String>,
22}
23
24impl SubmissionDispatchMessage {
25    pub fn simple(submission: crate::datastore::Submission, completed_queue: Option<String>) -> Self {
26        Self {
27            submission,
28            completed_queue,
29            file_infos: Default::default(),
30            file_tree: Default::default(),
31            errors: Default::default(),
32            results: Default::default(),
33        }
34    }
35}
36
37#[derive(Serialize, Deserialize, Default, Clone)]
38#[serde(default)]
39pub struct FileTreeData {
40    pub name: Vec<String>,
41    pub children: HashMap<Sha256, FileTreeData>
42}
43
44#[derive(Serialize, Deserialize)]
45#[serde(rename_all="UPPERCASE")]
46pub enum WatchQueueStatus {
47    Fail, 
48    Ok, 
49    Start, 
50    Stop
51}
52
53
54/// These are messages sent by dispatcher on the watch queue
55#[derive(Serialize, Deserialize)]
56pub struct WatchQueueMessage {
57    /// Cache key
58    pub cache_key: Option<String>,
59    /// Watch statuses
60    pub status: WatchQueueStatus
61}
62
63impl WatchQueueMessage {
64    pub fn stop() -> Self {
65        Self {
66            cache_key: None,
67            status: WatchQueueStatus::Stop,
68        }
69    }
70
71    pub fn fail(cache_key: String) -> Self {
72        Self {
73            cache_key: Some(cache_key),
74            status: WatchQueueStatus::Fail,
75        }
76    }
77
78    pub fn ok(cache_key: String) -> Self {
79        Self {
80            cache_key: Some(cache_key),
81            status: WatchQueueStatus::Ok,
82        }
83    }
84}
85
86impl From<WatchQueueStatus> for WatchQueueMessage {
87    fn from(value: WatchQueueStatus) -> Self {
88        Self {
89            cache_key: None,
90            status: value
91        }
92    }
93}
94
95
96#[derive(Serialize, Deserialize)]
97#[serde(rename_all="SCREAMING_SNAKE_CASE")]
98pub enum MessageClasses {
99    CreateWatch,
100    ListOutstanding,
101    UpdateBadSid
102}
103
104
105/// Create Watch Message
106#[derive(Serialize, Deserialize)]
107pub struct CreateWatch {
108    /// Name of queue
109    pub queue_name: String,
110    /// Submission ID
111    pub submission: Sid,
112}
113
114/// List Outstanding Message
115#[derive(Serialize, Deserialize)]
116pub struct ListOutstanding {
117    /// Response queue
118    pub response_queue: String,
119    /// Submission ID
120    pub submission: Sid,
121}
122
123/// Model of Dispatcher Command Message
124#[derive(Serialize, Deserialize)]
125pub struct DispatcherCommandMessage {
126    /// Kind of message
127    kind: MessageClasses,
128    /// Message payload
129    payload_data: serde_json::Value,
130}
131
132impl DispatcherCommandMessage {
133    pub fn payload(&self) -> Result<DispatcherCommand, serde_json::Error> {
134        Ok(match self.kind {
135            MessageClasses::CreateWatch => DispatcherCommand::CreateWatch(serde_json::from_value(self.payload_data.clone())?),
136            MessageClasses::ListOutstanding => DispatcherCommand::ListOutstanding(serde_json::from_value(self.payload_data.clone())?),
137            MessageClasses::UpdateBadSid => DispatcherCommand::UpdateBadSid(serde_json::from_value(self.payload_data.clone())?),
138        })
139    }
140}
141
142
143/// Model of Dispatcher Command Message
144#[derive(Serialize, Deserialize)]
145pub enum DispatcherCommand {
146    /// Create Watch Message
147    CreateWatch(CreateWatch),
148
149    /// List Outstanding Message
150    ListOutstanding(ListOutstanding),
151
152    /// let the dispatcher know that the bad sid list has changed
153    UpdateBadSid(String),
154}
155
156