#![forbid(unsafe_code)]
use forensicnomicon::report::{Category, Finding, Observation, Severity, Source};
use peripheral_core::{Bus, DeviceConnection};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeviceAnomaly {
DmaCapableDevice {
instance_id: String,
bus: Bus,
},
MassStorageConnected {
instance_id: String,
},
HidDevice {
instance_id: String,
},
OsGeneratedSerial {
instance_id: String,
},
}
impl DeviceAnomaly {
#[must_use]
pub fn code(&self) -> &'static str {
match self {
Self::DmaCapableDevice { .. } => "PERIPHERAL-DMA-CAPABLE-DEVICE",
Self::MassStorageConnected { .. } => "PERIPHERAL-MASS-STORAGE-CONNECTED",
Self::HidDevice { .. } => "PERIPHERAL-HID-DEVICE",
Self::OsGeneratedSerial { .. } => "PERIPHERAL-OS-GENERATED-SERIAL",
}
}
}
impl Observation for DeviceAnomaly {
fn severity(&self) -> Option<Severity> {
Some(match self {
Self::DmaCapableDevice { .. } => Severity::High,
Self::MassStorageConnected { .. } | Self::HidDevice { .. } => Severity::Medium,
Self::OsGeneratedSerial { .. } => Severity::Low,
})
}
fn code(&self) -> &'static str {
DeviceAnomaly::code(self)
}
fn category(&self) -> Category {
match self {
Self::DmaCapableDevice { .. }
| Self::MassStorageConnected { .. }
| Self::HidDevice { .. } => Category::Threat,
Self::OsGeneratedSerial { .. } => Category::Integrity,
}
}
fn mitre(&self) -> &'static [&'static str] {
match self {
Self::DmaCapableDevice { .. } | Self::HidDevice { .. } => &["T1200"],
Self::MassStorageConnected { .. } => &["T1052.001", "T1091"],
Self::OsGeneratedSerial { .. } => &[],
}
}
fn note(&self) -> String {
match self {
Self::DmaCapableDevice { instance_id, bus } => format!(
"a {bus:?} device ({instance_id:?}) connected; the bus is bus-mastering \
DMA-capable, consistent with a direct-memory-access attack surface \
(MITRE T1200)"
),
Self::MassStorageConnected { instance_id } => format!(
"removable mass storage ({instance_id:?}) connected; consistent with data \
staging/exfiltration or autorun payload delivery (MITRE T1052.001 / T1091)"
),
Self::HidDevice { instance_id } => format!(
"a human-interface device ({instance_id:?}) connected; consistent with \
keystroke-injection hardware such as BadUSB (MITRE T1200)"
),
Self::OsGeneratedSerial { instance_id } => format!(
"the device ({instance_id:?}) exposed no real iSerial — Windows synthesized \
the instance-id serial; consistent with weaker device attribution"
),
}
}
}
#[must_use]
pub fn audit(devices: &[DeviceConnection]) -> Vec<DeviceAnomaly> {
let mut out = Vec::new();
for d in devices {
let id = || d.device_instance_id.clone();
if d.dma_capable {
out.push(DeviceAnomaly::DmaCapableDevice {
instance_id: id(),
bus: d.bus,
});
}
if d.bus.is_mass_storage() {
out.push(DeviceAnomaly::MassStorageConnected { instance_id: id() });
}
if is_hid(d) {
out.push(DeviceAnomaly::HidDevice { instance_id: id() });
}
if d.serial_is_os_generated {
out.push(DeviceAnomaly::OsGeneratedSerial { instance_id: id() });
}
}
out
}
#[must_use]
pub fn audit_findings(devices: &[DeviceConnection], scope: impl Into<String>) -> Vec<Finding> {
let src = source(scope);
audit(devices)
.iter()
.map(|a| a.to_finding(src.clone()))
.collect()
}
fn is_hid(d: &DeviceConnection) -> bool {
if d.bus == Bus::Bluetooth {
return true;
}
let id = d.device_instance_id.to_ascii_uppercase();
id.starts_with("HID\\") || id.contains("\\HID") || id.contains("&HID")
}
#[must_use]
pub fn source(scope: impl Into<String>) -> Source {
Source {
analyzer: "peripheral-forensic".to_string(),
scope: scope.into(),
version: Some(env!("CARGO_PKG_VERSION").to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use peripheral_core::{MitreRef, Provenance};
fn conn(instance_id: &str, bus: Bus, dma: bool, os_serial: bool) -> DeviceConnection {
DeviceConnection {
bus,
device_class_guid: None,
vid: None,
pid: None,
device_serial: None,
serial_is_os_generated: os_serial,
friendly_name: None,
device_instance_id: instance_id.to_string(),
first_install: None,
last_install: None,
last_arrival: None,
last_removal: None,
parent_id_prefix: None,
volume_guid: None,
drive_letter: None,
volume_serial: None,
disk_signature: None,
dma_capable: dma,
mitre: vec![MitreRef("T1200")],
source: Provenance {
file: "f".into(),
line: 1,
},
}
}
fn codes(a: &[DeviceAnomaly]) -> Vec<&str> {
a.iter().map(DeviceAnomaly::code).collect()
}
#[test]
fn dma_device_is_flagged_high_threat() {
let a = audit(&[conn("1394\\X\\0", Bus::FireWire, true, false)]);
assert!(codes(&a).contains(&"PERIPHERAL-DMA-CAPABLE-DEVICE"));
let dma = a
.iter()
.find(|x| x.code() == "PERIPHERAL-DMA-CAPABLE-DEVICE")
.unwrap();
assert_eq!(dma.severity(), Some(Severity::High));
assert_eq!(dma.category(), Category::Threat);
assert!(dma.mitre().contains(&"T1200"));
}
#[test]
fn mass_storage_is_flagged_medium_threat() {
let a = audit(&[conn("USBSTOR\\Disk\\X", Bus::Usb, false, false)]);
assert!(codes(&a).contains(&"PERIPHERAL-MASS-STORAGE-CONNECTED"));
let ms = a
.iter()
.find(|x| x.code() == "PERIPHERAL-MASS-STORAGE-CONNECTED")
.unwrap();
assert_eq!(ms.severity(), Some(Severity::Medium));
assert!(ms.mitre().contains(&"T1052.001"));
assert!(ms.mitre().contains(&"T1091"));
}
#[test]
fn hid_device_is_flagged() {
assert!(
codes(&audit(&[conn("BTHENUM\\X", Bus::Bluetooth, false, false)]))
.contains(&"PERIPHERAL-HID-DEVICE")
);
assert!(codes(&audit(&[conn(
"HID\\VID_046D&PID_C52B\\X",
Bus::Usb,
false,
false
)]))
.contains(&"PERIPHERAL-HID-DEVICE"));
}
#[test]
fn os_generated_serial_is_flagged_low_integrity() {
let a = audit(&[conn("USBSTOR\\Disk\\7&abc&0", Bus::Usb, false, true)]);
assert!(codes(&a).contains(&"PERIPHERAL-OS-GENERATED-SERIAL"));
let os = a
.iter()
.find(|x| x.code() == "PERIPHERAL-OS-GENERATED-SERIAL")
.unwrap();
assert_eq!(os.severity(), Some(Severity::Low));
assert_eq!(os.category(), Category::Integrity);
assert!(os.mitre().is_empty());
}
#[test]
fn benign_non_storage_non_dma_device_fires_nothing() {
let a = audit(&[conn("WpdBusEnumRoot\\X", Bus::Mtp, false, false)]);
assert!(a.is_empty(), "got {:?}", codes(&a));
}
#[test]
fn findings_are_hedged_observations_never_verdicts() {
let f = audit_findings(
&[
conn("1394\\X\\0", Bus::FireWire, true, false),
conn("USBSTOR\\Disk\\7&abc&0", Bus::Usb, false, true),
conn("BTHENUM\\X", Bus::Bluetooth, false, false),
],
"host",
);
assert_eq!(
f.len(),
4,
"DMA + (mass-storage + os-serial) + hid = 4 findings"
);
for finding in &f {
let note = finding.note.to_ascii_lowercase();
assert!(note.contains("consistent with"), "must hedge: {note}");
for forbidden in ["proves", "confirms", "definitely"] {
assert!(
!note.contains(forbidden),
"must not assert a verdict: {note}"
);
}
}
}
#[test]
fn source_stamps_analyzer_and_version() {
let s = source("partition 1");
assert_eq!(s.analyzer, "peripheral-forensic");
assert_eq!(s.scope, "partition 1");
assert!(s.version.is_some());
}
}