assemblyline_models/datastore/
submission.rs

1
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use serde_with::{DeserializeFromStr, SerializeDisplay};
9use struct_metadata::Described;
10
11use crate::types::{ClassificationString, ExpandingClassification, JsonMap, ServiceName, Sha256, Sid, Text, UpperString, Wildcard};
12use crate::{ElasticMeta, Readable};
13
14
15/// A logging event describing the processing of a submission
16#[derive(Serialize, Deserialize, Debug, Described, Clone)]
17#[metadata_type(ElasticMeta)]
18#[metadata(index=false, store=false)]
19pub struct TraceEvent {
20    #[serde(default="default_now")]
21    pub timestamp: DateTime<Utc>,
22    pub event_type: String,
23    pub service: Option<ServiceName>,
24    pub file: Option<Sha256>,
25    pub message: Option<String>,
26}
27
28fn default_now() -> DateTime<Utc> { Utc::now() }
29
30/// Model of Submission
31#[derive(Serialize, Deserialize, Debug, Described, Clone)]
32#[metadata_type(ElasticMeta)]
33#[metadata(index=true, store=true)]
34pub struct Submission {
35    /// Time at which the submission was archived
36    #[serde(default)]
37    pub archive_ts: Option<DateTime<Utc>>,
38    /// Document is present in the malware archive
39    #[serde(default)]
40    pub archived: bool,
41    /// Classification of the submission
42    #[serde(flatten)]
43    pub classification: ExpandingClassification,
44    /// A log of events describing the processing sequence.
45    #[serde(default)]
46    #[metadata(store=false, index=false)]
47    pub tracing_events: Vec<TraceEvent>,
48    /// Total number of errors in the submission
49    pub error_count: i32,
50    /// List of error keys
51    #[metadata(store=false)]
52    pub errors: Vec<String>,
53    /// Expiry timestamp
54    #[serde(default)]
55    #[metadata(store=false)]
56    pub expiry_ts: Option<DateTime<Utc>>,
57    /// Total number of files in the submission
58    pub file_count: i32,
59    /// List of files that were originally submitted
60    pub files: Vec<File>, 
61    // pub file: File,
62    /// Maximum score of all the files in the scan
63    pub max_score: i32,
64    /// Metadata associated to the submission
65    #[serde(default)]
66    #[metadata(store=false, mapping="flattenedobject", copyto="__text__")]
67    pub metadata: HashMap<String, Wildcard>,
68    /// Submission parameter details
69    pub params: SubmissionParams,
70    /// List of result keys
71    #[metadata(store=false)]
72    pub results: Vec<Wildcard>,
73    /// Submission ID
74    #[metadata(copyto="__text__")]
75    pub sid: Sid,
76    /// Status of the submission
77    pub state: SubmissionState,
78    /// This document is going to be deleted as soon as it finishes
79    #[serde(default)]
80    pub to_be_deleted: bool,
81    /// Submission-specific times
82    #[serde(default)]
83    pub times: Times,
84    /// Malicious verdict details
85    #[serde(default)]
86    pub verdict: Verdict,
87    /// Was loaded from the archive
88    #[serde(default)]
89    #[metadata(index=false)]
90    pub from_archive: bool,
91
92    /// the filescore key, used in deduplication. This is a non-unique key, that is
93    /// shared by submissions that may be processed as duplicates.
94    #[serde(default)]
95    #[metadata(index=false, store=false)]
96    pub scan_key: Option<String>,
97}
98
99#[cfg(feature = "rand")]
100impl rand::distr::Distribution<Submission> for rand::distr::StandardUniform {
101    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> Submission {
102        Submission {
103            archive_ts: None,
104            archived: rng.random(),
105            classification: ExpandingClassification::try_unrestricted().unwrap(),
106            tracing_events: Default::default(),
107            error_count: 0,
108            errors: vec![],
109            expiry_ts: None,
110            file_count: 1,
111            files: vec![rng.random()],
112            max_score: rng.random(),
113            metadata: Default::default(),
114            params: SubmissionParams::new(ClassificationString::try_unrestricted().unwrap()),
115            results: vec![],
116            sid: rng.random(),
117            state: SubmissionState::Submitted,
118            to_be_deleted: false,
119            times: Times {
120                completed: None,
121                submitted: Utc::now(),
122            },
123            verdict: Verdict {
124                malicious: vec![],
125                non_malicious: vec![],
126            },
127            from_archive: false,
128            scan_key: None,
129        }
130    }
131}
132
133impl Readable for Submission {
134    fn set_from_archive(&mut self, from_archive: bool) {
135        self.from_archive = from_archive
136    }
137}
138
139
140/// Submission Parameters
141#[derive(Serialize, Deserialize, Debug, Described, Clone)]
142#[metadata_type(ElasticMeta)]
143#[metadata(index=true, store=false)]
144pub struct SubmissionParams {
145    /// classification of the submission
146    pub classification: ClassificationString,
147    /// Should a deep scan be performed?
148    #[serde(default)]
149    pub deep_scan: bool,
150    /// Description of the submission
151    #[serde(default, skip_serializing_if = "Option::is_none")]
152    #[metadata(store=true, copyto="__text__")]
153    pub description: Option<Text>,
154    /// Should this submission generate an alert?
155    #[serde(default)]
156    pub generate_alert: bool,
157    /// List of groups related to this scan
158    // #[serde(default, skip_serializing_if = "Vec::is_empty")]
159    #[serde(default)]
160    pub groups: Vec<UpperString>,
161    /// Ignore the cached service results?
162    #[serde(default)]
163    pub ignore_cache: bool,
164    /// Should we ignore dynamic recursion prevention?
165    #[serde(default)]
166    pub ignore_recursion_prevention: bool,
167    /// Should we ignore filtering services?
168    #[serde(default)]
169    pub ignore_filtering: bool,
170    /// Ignore the file size limits?
171    #[serde(default)]
172    pub ignore_size: bool,
173    /// Exempt from being dropped by ingester?
174    #[serde(default)]
175    pub never_drop: bool,
176    /// Is the file submitted already known to be malicious?
177    #[serde(default)]
178    pub malicious: bool,
179    /// Max number of extracted files
180    #[serde(default="default_max_extracted")]
181    pub max_extracted: i32,
182    /// Max number of supplementary files
183    #[serde(default="default_max_supplementary")]
184    pub max_supplementary: i32,
185    /// Priority of the scan
186    #[serde(default="default_priority")]
187    pub priority: u16,
188    /// Does this submission count against quota?
189    #[serde(default)]
190    pub quota_item: bool,
191    /// Service selection
192    #[serde(default)]
193    pub services: ServiceSelection,
194    /// Service-specific parameters
195    #[serde(default)]
196    #[metadata(index=false, store=false)]
197    pub service_spec: HashMap<ServiceName, JsonMap>,
198    /// User who submitted the file
199    #[metadata(store=true, copyto="__text__")]
200    pub submitter: String,
201    /// Collect extra logging information during dispatching
202    #[serde(default)]
203    pub trace: bool,
204    /// Time, in days, to live for this submission
205    #[serde(default)]
206    pub ttl: i32,
207    /// Type of submission
208    #[serde(rename="type", default="default_type")]
209    pub submission_type: String,
210    /// Initialization for temporary submission data
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    #[metadata(index=false)]
213    pub initial_data: Option<Text>,
214    /// Does the submission automatically goes into the archive when completed?
215    #[serde(default)]
216    pub auto_archive: bool,
217    /// When the submission is archived, should we delete it from hot storage right away?
218    #[serde(default)]
219    pub delete_after_archive: bool,
220    /// Parent submission ID
221    #[serde(default)]
222    pub psid: Option<Sid>,
223    /// Should we use the alternate dtl while archiving?
224    #[serde(default)]
225    pub use_archive_alternate_dtl: bool,
226}
227
228fn default_max_extracted() -> i32 { 100 }
229fn default_max_supplementary() -> i32 { 100 }
230fn default_priority() -> u16 { 1000 }
231fn default_type() -> String { "USER".to_owned() }
232
233impl SubmissionParams {
234    pub fn new(classification: ClassificationString) -> Self {
235        Self {
236            classification,
237            deep_scan: false,
238            description: None,
239            generate_alert: false,
240            groups: vec![],
241            ignore_cache: false,
242            ignore_recursion_prevention: false,
243            ignore_filtering: false,
244            ignore_size: false,
245            never_drop: false,
246            malicious: false,
247            max_extracted: default_max_extracted(),
248            max_supplementary: default_max_supplementary(),
249            priority: default_priority(),
250            quota_item: false,
251            services: Default::default(),
252            service_spec: Default::default(),
253            submitter: "USER".to_owned(),
254            trace: false,
255            ttl: 30,
256            submission_type: default_type(),
257            initial_data: None,
258            auto_archive: false,
259            delete_after_archive: false,
260            psid: None,
261            use_archive_alternate_dtl: false,
262        }
263    }
264
265    pub fn set_description(mut self, text: &str) -> Self {
266        self.description = Some(text.into()); self
267    }    
268
269    pub fn set_services_selected(mut self, selected: &[&str]) -> Self {
270        self.services.selected = selected.iter().map(|s|ServiceName::from_string(s.to_string())).collect(); self
271    }
272
273    pub fn set_submitter(mut self, submitter: &str) -> Self {
274        self.submitter = submitter.to_owned(); self
275    }
276
277    pub fn set_groups(mut self, groups: &[&str]) -> Self {
278        self.groups = groups.iter().map(|s|s.parse().unwrap()).collect(); self
279    }
280
281    pub fn set_max_extracted(mut self, max_extracted: i32) -> Self {
282        self.max_extracted = max_extracted; self
283    }
284
285    pub fn set_generate_alert(mut self, alert: bool) -> Self {
286        self.generate_alert = alert; self
287    }
288
289    /// Get the sections of the submission parameters that should be used in result hashes.
290    fn get_hashing_keys(&self) -> Vec<(String, serde_json::Value)> {
291        [
292            ("classification", json!(self.classification)),
293            ("deep_scan", json!(self.deep_scan)),
294            ("ignore_cache", json!(self.ignore_cache)),
295            ("ignore_recursion_prevention", json!(self.ignore_recursion_prevention)),
296            ("ignore_filtering", json!(self.ignore_filtering)),
297            ("ignore_size", json!(self.ignore_size)),
298            ("max_extracted", json!(self.max_extracted)),
299            ("max_supplementary", json!(self.max_supplementary)),
300        ].into_iter().map(|(key, value)|(key.to_owned(), value)).collect()
301    }
302
303
304    /// This is the key used to store the final score of a submission for fast lookup.
305    /// 
306    /// This lookup is one of the methods used to check for duplication in ingestion process,
307    /// so this key is fairly sensitive.
308    pub fn create_filescore_key(&self, sha256: &Sha256, services: Option<Vec<ServiceName>>) -> String {
309        // TODO do we need this version thing still be here?
310        // One up this if the cache is ever messed up and we
311        // need to quickly invalidate all old cache entries.
312        let version = 0;
313
314        let services = match services {
315            Some(services) => services,
316            None => self.services.selected.clone(),
317        };
318
319        let mut data = self.get_hashing_keys();
320        data.push(("service_spec".to_owned(), {
321            let mut spec = vec![];
322            for (key, values) in self.service_spec.clone() {
323                let mut values: Vec<(String, Value)> = values.into_iter().collect();
324                values.sort_by(|a, b|a.0.cmp(&b.0));
325                spec.push((key, values));
326            }
327            spec.sort_by(|a, b|a.0.cmp(&b.0));
328            json!(spec)
329        }));
330        data.push(("sha256".to_owned(), json!(sha256)));
331        data.push(("services".to_owned(), json!(services)));
332
333        let s = data.into_iter().map(|(k, v)| format!("{k}: {v}")).collect::<Vec<String>>().join(", ");
334        // s = ', '.join([f"{k}: {data[k]}" for k in sorted(data.keys())])
335
336        use md5::{Md5, Digest};
337        let mut hasher = Md5::new();
338        hasher.update(s);
339        let hash = hasher.finalize();
340        let mut hex = String::new();
341        for byte in hash {
342            hex += &format!("{byte:x}");
343        }
344
345        format!("{hex}v{version}")
346    }
347}
348
349// pub struct SubmissionParamsBuilder {
350//     params: SubmissionParams,
351// }
352
353// impl SubmissionParamsBuilder {
354//     pub fn new(classification: ClassificationString) -> Self {
355//         Self { params: SubmissionParams::new(classification) }
356//     }
357
358
359// }
360
361
362/// Service Selection Scheme
363#[derive(Serialize, Deserialize, Default, Debug, Described, Clone)]
364#[serde(default)]
365#[metadata_type(ElasticMeta)]
366#[metadata(index=false, store=false)]
367pub struct ServiceSelection {
368    /// List of excluded services
369    #[serde(skip_serializing_if = "Vec::is_empty")]
370    pub excluded: Vec<ServiceName>,
371    /// List of services to rescan when moving between systems
372    #[serde(skip_serializing_if = "Vec::is_empty")]
373    pub rescan: Vec<ServiceName>,
374    /// Add to service selection when resubmitting
375    #[serde(skip_serializing_if = "Vec::is_empty")]
376    pub resubmit: Vec<ServiceName>,
377    /// List of selected services
378    #[serde(skip_serializing_if = "Vec::is_empty")]
379    pub selected: Vec<ServiceName>,
380}
381
382/// Submission-Relevant Times
383#[derive(Serialize, Deserialize, Debug, Described, Clone)]
384#[metadata_type(ElasticMeta)]
385#[metadata(index=true, store=true)]
386pub struct Times {
387    /// Date at which the submission finished scanning
388    #[metadata(store=false)]
389    pub completed: Option<DateTime<Utc>>,
390    /// Date at which the submission started scanning
391    pub submitted: DateTime<Utc>,
392}
393
394impl Default for Times {
395    fn default() -> Self {
396        Self { 
397            completed: None, 
398            submitted: Utc::now() 
399        }
400    }
401}
402
403/// Submission Verdict
404#[derive(Serialize, Deserialize, Debug, Described, Clone, Default)]
405#[metadata_type(ElasticMeta)]
406#[metadata(index=true, store=false)]
407#[serde(default)]
408pub struct Verdict {
409    /// List of user that thinks this submission is malicious
410    pub malicious: Vec<String>,
411    /// List of user that thinks this submission is non-malicious
412    pub non_malicious: Vec<String>,
413}
414
415#[derive(SerializeDisplay, DeserializeFromStr, Debug, PartialEq, Eq, strum::Display, strum::EnumString, Described, Clone, Copy)]
416#[strum(ascii_case_insensitive, serialize_all = "lowercase")]
417#[metadata_type(ElasticMeta)]
418pub enum SubmissionState {
419    Failed,
420    Submitted,
421    Completed,
422}
423
424#[test]
425fn test_state_serialization() {
426    assert_eq!(serde_json::to_string(&SubmissionState::Failed).unwrap(), "\"failed\"");
427    assert_eq!(serde_json::from_str::<SubmissionState>("\"failed\"").unwrap(), SubmissionState::Failed);
428    assert_eq!(serde_json::to_value(SubmissionState::Failed).unwrap(), serde_json::json!("failed"));
429    assert_eq!(serde_json::from_str::<SubmissionState>("\"Failed\"").unwrap(), SubmissionState::Failed);
430}
431
432
433/// File Model of Submission
434#[derive(Serialize, Deserialize, Debug, Described, Clone)]
435#[metadata_type(ElasticMeta)]
436#[metadata(index=true, store=false)]
437pub struct File {
438    /// Name of the file
439    #[metadata(copyto="__text__")]
440    pub name: String,
441    /// Size of the file in bytes
442    #[metadata(mapping="long")]
443    pub size: Option<u64>,
444    /// SHA256 hash of the file
445    #[metadata(copyto="__text__")]
446    pub sha256: Sha256,
447}
448
449#[cfg(feature = "rand")]
450impl rand::distr::Distribution<File> for rand::distr::StandardUniform {
451    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> File {
452        File {
453            name: "readme.txt".to_string(),
454            size: Some(rng.random_range(10..1_000_000)),
455            sha256: rng.random()
456        }
457    }
458}