Skip to main content

assemblyline_models/datastore/
submission.rs

1
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use serde_with::{DeserializeFromStr, SerializeDisplay};
9use struct_metadata::Described;
10
11#[cfg(feature = "rand")]
12use rand::RngExt;
13
14use crate::types::{ClassificationString, ExpandingClassification, JsonMap, ServiceName, Sha256, Sid, Text, UpperString, Wildcard};
15use crate::{ElasticMeta, Readable};
16
17
18/// A logging event describing the processing of a submission
19#[derive(Serialize, Deserialize, Debug, Described, Clone)]
20#[metadata_type(ElasticMeta)]
21#[metadata(index=false, store=false)]
22pub struct TraceEvent {
23    #[serde(default="default_now")]
24    pub timestamp: DateTime<Utc>,
25    pub event_type: String,
26    pub service: Option<ServiceName>,
27    pub file: Option<Sha256>,
28    pub message: Option<String>,
29}
30
31fn default_now() -> DateTime<Utc> { Utc::now() }
32
33/// Model of Submission
34#[derive(Serialize, Deserialize, Debug, Described, Clone)]
35#[metadata_type(ElasticMeta)]
36#[metadata(index=true, store=true)]
37pub struct Submission {
38    /// Time at which the submission was archived
39    #[serde(default)]
40    pub archive_ts: Option<DateTime<Utc>>,
41    /// Document is present in the malware archive
42    #[serde(default)]
43    pub archived: bool,
44    /// Classification of the submission
45    #[serde(flatten)]
46    pub classification: ExpandingClassification,
47    /// A log of events describing the processing sequence.
48    #[serde(default)]
49    #[metadata(store=false, index=false)]
50    pub tracing_events: Vec<TraceEvent>,
51    /// Total number of errors in the submission
52    pub error_count: i32,
53    /// List of error keys
54    #[metadata(store=false)]
55    pub errors: Vec<String>,
56    /// Expiry timestamp
57    #[serde(default)]
58    #[metadata(store=false)]
59    pub expiry_ts: Option<DateTime<Utc>>,
60    /// Total number of files in the submission
61    pub file_count: i32,
62    /// List of files that were originally submitted
63    pub files: Vec<File>,
64    // pub file: File,
65    /// Maximum score of all the files in the scan
66    pub max_score: i32,
67    /// Metadata associated to the submission
68    #[serde(default)]
69    #[metadata(store=false, mapping="flattenedobject", copyto="__text__")]
70    pub metadata: HashMap<String, Wildcard>,
71    /// Submission parameter details
72    pub params: SubmissionParams,
73    /// List of result keys
74    #[metadata(store=false)]
75    pub results: Vec<Wildcard>,
76    /// Submission ID
77    #[metadata(copyto="__text__")]
78    pub sid: Sid,
79    /// Status of the submission
80    pub state: SubmissionState,
81    /// This document is going to be deleted as soon as it finishes
82    #[serde(default)]
83    pub to_be_deleted: bool,
84    /// Submission-specific times
85    #[serde(default)]
86    pub times: Times,
87    /// Malicious verdict details
88    #[serde(default)]
89    pub verdict: Verdict,
90    /// Was loaded from the archive
91    #[serde(default)]
92    #[metadata(index=false)]
93    pub from_archive: bool,
94
95    /// the filescore key, used in deduplication. This is a non-unique key, that is
96    /// shared by submissions that may be processed as duplicates.
97    #[serde(default)]
98    #[metadata(index=false, store=false)]
99    pub scan_key: Option<String>,
100}
101
102#[cfg(feature = "rand")]
103impl rand::distr::Distribution<Submission> for rand::distr::StandardUniform {
104    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> Submission {
105        Submission {
106            archive_ts: None,
107            archived: rng.random(),
108            classification: ExpandingClassification::try_unrestricted().unwrap(),
109            tracing_events: Default::default(),
110            error_count: 0,
111            errors: vec![],
112            expiry_ts: None,
113            file_count: 1,
114            files: vec![rng.random()],
115            max_score: rng.random(),
116            metadata: Default::default(),
117            params: SubmissionParams::new(ClassificationString::try_unrestricted().unwrap()),
118            results: vec![],
119            sid: rng.random(),
120            state: SubmissionState::Submitted,
121            to_be_deleted: false,
122            times: Times {
123                completed: None,
124                submitted: Utc::now(),
125            },
126            verdict: Verdict {
127                malicious: vec![],
128                non_malicious: vec![],
129            },
130            from_archive: false,
131            scan_key: None,
132        }
133    }
134}
135
136impl Readable for Submission {
137    fn set_from_archive(&mut self, from_archive: bool) {
138        self.from_archive = from_archive
139    }
140}
141
142
143/// Submission Parameters
144#[derive(Serialize, Deserialize, Debug, Described, Clone)]
145#[metadata_type(ElasticMeta)]
146#[metadata(index=true, store=false)]
147pub struct SubmissionParams {
148    /// classification of the submission
149    pub classification: ClassificationString,
150    /// Should a deep scan be performed?
151    #[serde(default)]
152    pub deep_scan: bool,
153    /// Description of the submission
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    #[metadata(store=true, copyto="__text__")]
156    pub description: Option<Text>,
157    /// Override the default filetype identification
158    #[serde(default, skip_serializing_if = "Option::is_none")]
159    pub filetype_override: Option<String>,
160    /// Should this submission generate an alert?
161    #[serde(default)]
162    pub generate_alert: bool,
163    /// List of groups related to this scan
164    // #[serde(default, skip_serializing_if = "Vec::is_empty")]
165    #[serde(default)]
166    pub groups: Vec<UpperString>,
167    /// Ignore the cached service results?
168    #[serde(default)]
169    pub ignore_cache: bool,
170    /// Should we ignore dynamic recursion prevention?
171    #[serde(default)]
172    pub ignore_recursion_prevention: bool,
173    /// Should we ignore filtering services?
174    #[serde(default)]
175    pub ignore_filtering: bool,
176    /// Ignore the file size limits?
177    #[serde(default)]
178    pub ignore_size: bool,
179    /// Exempt from being dropped by ingester?
180    #[serde(default)]
181    pub never_drop: bool,
182    /// Is the file submitted already known to be malicious?
183    #[serde(default)]
184    pub malicious: bool,
185    /// Max number of extracted files
186    #[serde(default="default_max_extracted")]
187    pub max_extracted: i32,
188    /// Max number of supplementary files
189    #[serde(default="default_max_supplementary")]
190    pub max_supplementary: i32,
191    /// Priority of the scan
192    #[serde(default="default_priority")]
193    pub priority: u16,
194    /// Does this submission count against quota?
195    #[serde(default)]
196    pub quota_item: bool,
197    /// Service selection
198    #[serde(default)]
199    pub services: ServiceSelection,
200    /// Service-specific parameters
201    #[serde(default)]
202    #[metadata(index=false, store=false)]
203    pub service_spec: HashMap<ServiceName, JsonMap>,
204    /// User who submitted the file
205    #[metadata(store=true, copyto="__text__")]
206    pub submitter: String,
207    /// Collect extra logging information during dispatching
208    #[serde(default)]
209    pub trace: bool,
210    /// Time, in days, to live for this submission
211    #[serde(default)]
212    pub ttl: i32,
213    /// Type of submission
214    #[serde(rename="type", default="default_type")]
215    pub submission_type: String,
216    /// Initialization for temporary submission data
217    #[serde(default, skip_serializing_if = "Option::is_none")]
218    #[metadata(index=false)]
219    pub initial_data: Option<Text>,
220    /// Does the submission automatically goes into the archive when completed?
221    #[serde(default)]
222    pub auto_archive: bool,
223    /// When the submission is archived, should we delete it from hot storage right away?
224    #[serde(default)]
225    pub delete_after_archive: bool,
226    /// Parent submission ID
227    #[serde(default)]
228    pub psid: Option<Sid>,
229    /// Should we use the alternate dtl while archiving?
230    #[serde(default)]
231    pub use_archive_alternate_dtl: bool,
232}
233
234fn default_max_extracted() -> i32 { 100 }
235fn default_max_supplementary() -> i32 { 100 }
236fn default_priority() -> u16 { 1000 }
237fn default_type() -> String { "USER".to_owned() }
238
239impl SubmissionParams {
240    pub fn new(classification: ClassificationString) -> Self {
241        Self {
242            classification,
243            deep_scan: false,
244            description: None,
245            filetype_override: None,
246            generate_alert: false,
247            groups: vec![],
248            ignore_cache: false,
249            ignore_recursion_prevention: false,
250            ignore_filtering: false,
251            ignore_size: false,
252            never_drop: false,
253            malicious: false,
254            max_extracted: default_max_extracted(),
255            max_supplementary: default_max_supplementary(),
256            priority: default_priority(),
257            quota_item: false,
258            services: Default::default(),
259            service_spec: Default::default(),
260            submitter: "USER".to_owned(),
261            trace: false,
262            ttl: 30,
263            submission_type: default_type(),
264            initial_data: None,
265            auto_archive: false,
266            delete_after_archive: false,
267            psid: None,
268            use_archive_alternate_dtl: false,
269        }
270    }
271
272    pub fn set_description(mut self, text: &str) -> Self {
273        self.description = Some(text.into()); self
274    }
275
276    pub fn set_ignore_cache(mut self, value: bool) -> Self {
277        self.ignore_cache = value; self
278    }
279
280    pub fn set_services_selected(mut self, selected: &[&str]) -> Self {
281        self.services.selected = selected.iter().map(|s|ServiceName::from_string(s.to_string())).collect(); self
282    }
283
284    pub fn set_submitter(mut self, submitter: &str) -> Self {
285        self.submitter = submitter.to_owned(); self
286    }
287
288    pub fn set_groups(mut self, groups: &[&str]) -> Self {
289        self.groups = groups.iter().map(|s|s.parse().unwrap()).collect(); self
290    }
291
292    pub fn set_max_extracted(mut self, max_extracted: i32) -> Self {
293        self.max_extracted = max_extracted; self
294    }
295
296    pub fn set_generate_alert(mut self, alert: bool) -> Self {
297        self.generate_alert = alert; self
298    }
299
300    pub fn set_initial_data(mut self, initial_data: Option<Text>) -> Self {
301        self.initial_data = initial_data; self
302    }
303
304    /// Get the sections of the submission parameters that should be used in result hashes.
305    fn get_hashing_keys(&self) -> Vec<(String, serde_json::Value)> {
306        [
307            ("classification", json!(self.classification)),
308            ("deep_scan", json!(self.deep_scan)),
309            ("ignore_cache", json!(self.ignore_cache)),
310            ("ignore_recursion_prevention", json!(self.ignore_recursion_prevention)),
311            ("ignore_filtering", json!(self.ignore_filtering)),
312            ("ignore_size", json!(self.ignore_size)),
313            ("max_extracted", json!(self.max_extracted)),
314            ("max_supplementary", json!(self.max_supplementary)),
315        ].into_iter().map(|(key, value)|(key.to_owned(), value)).collect()
316    }
317
318
319    /// This is the key used to store the final score of a submission for fast lookup.
320    ///
321    /// This lookup is one of the methods used to check for duplication in ingestion process,
322    /// so this key is fairly sensitive.
323    pub fn create_filescore_key(&self, sha256: &Sha256, services: Option<Vec<ServiceName>>) -> String {
324        // TODO do we need this version thing still be here?
325        // One up this if the cache is ever messed up and we
326        // need to quickly invalidate all old cache entries.
327        let version = 0;
328
329        let services = match services {
330            Some(services) => services,
331            None => self.services.selected.clone(),
332        };
333
334        let mut data = self.get_hashing_keys();
335        data.push(("service_spec".to_owned(), {
336            let mut spec = vec![];
337            for (key, values) in self.service_spec.clone() {
338                let mut values: Vec<(String, Value)> = values.into_iter().collect();
339                values.sort_by(|a, b|a.0.cmp(&b.0));
340                spec.push((key, values));
341            }
342            spec.sort_by(|a, b|a.0.cmp(&b.0));
343            json!(spec)
344        }));
345        data.push(("sha256".to_owned(), json!(sha256)));
346        data.push(("services".to_owned(), json!(services)));
347
348        let s = data.into_iter().map(|(k, v)| format!("{k}: {v}")).collect::<Vec<String>>().join(", ");
349        // s = ', '.join([f"{k}: {data[k]}" for k in sorted(data.keys())])
350
351        use md5::{Md5, Digest};
352        let mut hasher = Md5::new();
353        hasher.update(s);
354        let hash = hasher.finalize();
355        let mut hex = String::new();
356        for byte in hash {
357            hex += &format!("{byte:x}");
358        }
359
360        format!("{hex}v{version}")
361    }
362}
363
364// pub struct SubmissionParamsBuilder {
365//     params: SubmissionParams,
366// }
367
368// impl SubmissionParamsBuilder {
369//     pub fn new(classification: ClassificationString) -> Self {
370//         Self { params: SubmissionParams::new(classification) }
371//     }
372
373
374// }
375
376
377/// Service Selection Scheme
378#[derive(Serialize, Deserialize, Default, Debug, Described, Clone)]
379#[serde(default)]
380#[metadata_type(ElasticMeta)]
381#[metadata(index=false, store=false)]
382pub struct ServiceSelection {
383    /// List of excluded services
384    #[serde(skip_serializing_if = "Vec::is_empty")]
385    pub excluded: Vec<ServiceName>,
386    /// List of services to rescan when moving between systems
387    #[serde(skip_serializing_if = "Vec::is_empty")]
388    pub rescan: Vec<ServiceName>,
389    /// Add to service selection when resubmitting
390    #[serde(skip_serializing_if = "Vec::is_empty")]
391    pub resubmit: Vec<ServiceName>,
392    /// List of selected services
393    #[serde(skip_serializing_if = "Vec::is_empty")]
394    pub selected: Vec<ServiceName>,
395}
396
397/// Submission-Relevant Times
398#[derive(Serialize, Deserialize, Debug, Described, Clone)]
399#[metadata_type(ElasticMeta)]
400#[metadata(index=true, store=true)]
401pub struct Times {
402    /// Date at which the submission finished scanning
403    #[metadata(store=false)]
404    pub completed: Option<DateTime<Utc>>,
405    /// Date at which the submission started scanning
406    pub submitted: DateTime<Utc>,
407}
408
409impl Default for Times {
410    fn default() -> Self {
411        Self {
412            completed: None,
413            submitted: Utc::now()
414        }
415    }
416}
417
418/// Submission Verdict
419#[derive(Serialize, Deserialize, Debug, Described, Clone, Default)]
420#[metadata_type(ElasticMeta)]
421#[metadata(index=true, store=false)]
422#[serde(default)]
423pub struct Verdict {
424    /// List of user that thinks this submission is malicious
425    pub malicious: Vec<String>,
426    /// List of user that thinks this submission is non-malicious
427    pub non_malicious: Vec<String>,
428}
429
430#[derive(SerializeDisplay, DeserializeFromStr, Debug, PartialEq, Eq, strum::Display, strum::EnumString, Described, Clone, Copy)]
431#[strum(ascii_case_insensitive, serialize_all = "lowercase")]
432#[metadata_type(ElasticMeta)]
433pub enum SubmissionState {
434    Failed,
435    Submitted,
436    Completed,
437}
438
439#[test]
440fn test_state_serialization() {
441    assert_eq!(serde_json::to_string(&SubmissionState::Failed).unwrap(), "\"failed\"");
442    assert_eq!(serde_json::from_str::<SubmissionState>("\"failed\"").unwrap(), SubmissionState::Failed);
443    assert_eq!(serde_json::to_value(SubmissionState::Failed).unwrap(), serde_json::json!("failed"));
444    assert_eq!(serde_json::from_str::<SubmissionState>("\"Failed\"").unwrap(), SubmissionState::Failed);
445}
446
447
448/// File Model of Submission
449#[derive(Serialize, Deserialize, Debug, Described, Clone)]
450#[metadata_type(ElasticMeta)]
451#[metadata(index=true, store=false)]
452pub struct File {
453    /// Name of the file
454    #[metadata(copyto="__text__")]
455    pub name: String,
456    /// Size of the file in bytes
457    #[metadata(mapping="long")]
458    pub size: Option<u64>,
459    /// SHA256 hash of the file
460    #[metadata(copyto="__text__")]
461    pub sha256: Sha256,
462}
463
464#[cfg(feature = "rand")]
465impl rand::distr::Distribution<File> for rand::distr::StandardUniform {
466    fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> File {
467        File {
468            name: "readme.txt".to_string(),
469            size: Some(rng.random_range(10..1_000_000)),
470            sha256: rng.random()
471        }
472    }
473}