use std::collections::HashMap;
use std::path::PathBuf;
use pyo3::{pyclass, pymethods, PyResult};
use uuid::Uuid;
#[pyclass(frozen)]
#[derive(Debug)]
pub struct Label {
#[pyo3(get)]
pub id: u64,
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub parent: Option<String>,
}
#[pymethods]
impl Label {
#[must_use]
pub fn __repr__(&self) -> String {
if let Some(parent) = &self.parent {
format!("Label {}: {parent}/{}", self.id, self.name)
} else {
format!("Label {}: {}", self.id, self.name)
}
}
#[must_use]
pub fn __str__(&self) -> String {
self.name.clone()
}
}
impl From<malwaredb_client::malwaredb_api::Label> for Label {
fn from(label: malwaredb_client::malwaredb_api::Label) -> Self {
Self {
id: label.id,
name: label.name,
parent: label.parent,
}
}
}
#[pyclass(frozen, from_py_object)]
#[derive(Debug, Clone)]
pub struct ServerInfo {
#[pyo3(get)]
pub os_name: String,
#[pyo3(get)]
pub memory_used: String,
#[pyo3(get)]
pub mdb_version: String,
#[pyo3(get)]
pub db_version: String,
#[pyo3(get)]
pub db_size: String,
#[pyo3(get)]
pub num_samples: u64,
#[pyo3(get)]
pub num_users: u32,
#[pyo3(get)]
pub uptime: String,
#[pyo3(get)]
pub instance_name: String,
}
#[pymethods]
impl ServerInfo {
#[must_use]
pub fn __repr__(&self) -> String {
format!(
"MalwareDB {} running on {} for {}",
self.mdb_version, self.os_name, self.uptime
)
}
#[must_use]
pub fn __str__(&self) -> String {
format!("MalwareDB {}", self.mdb_version)
}
}
impl From<malwaredb_client::malwaredb_api::ServerInfo> for ServerInfo {
fn from(value: malwaredb_client::malwaredb_api::ServerInfo) -> Self {
Self {
os_name: value.os_name,
memory_used: value.memory_used,
mdb_version: value.mdb_version.to_string(),
db_version: value.db_version,
db_size: value.db_size,
num_samples: value.num_samples,
num_users: value.num_users,
uptime: value.uptime,
instance_name: value.instance_name,
}
}
}
#[pyclass(frozen)]
#[derive(Debug)]
pub struct Source {
#[pyo3(get)]
pub id: u32,
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub description: Option<String>,
#[pyo3(get)]
pub url: Option<String>,
#[pyo3(get)]
pub first_acquisition: String,
#[pyo3(get)]
pub malicious: Option<bool>,
}
#[pymethods]
impl Source {
#[must_use]
pub fn __repr__(&self) -> String {
let url = if let Some(url) = &self.url {
format!(" from {url}")
} else {
String::new()
};
let desc = if let Some(desc) = &self.description {
format!(" -- {desc}")
} else {
String::new()
};
format!("{}({}){url}{desc}", self.name, self.id)
}
#[must_use]
pub fn __str__(&self) -> String {
self.name.clone()
}
}
impl From<malwaredb_client::malwaredb_api::SourceInfo> for Source {
fn from(value: malwaredb_client::malwaredb_api::SourceInfo) -> Self {
Self {
id: value.id,
name: value.name,
description: value.description,
url: value.url,
first_acquisition: value.first_acquisition.to_rfc3339(),
malicious: value.malicious,
}
}
}
#[pyclass(frozen)]
#[derive(Debug)]
pub struct SupportedFileType {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub magic: Vec<String>,
#[pyo3(get)]
pub is_executable: bool,
#[pyo3(get)]
pub description: Option<String>,
}
#[pymethods]
impl SupportedFileType {
#[must_use]
pub fn __repr__(&self) -> String {
format!("{}, starting with {}", self.name, self.magic.join(" or "))
}
#[must_use]
pub fn __str__(&self) -> String {
self.name.clone()
}
}
impl From<malwaredb_client::malwaredb_api::SupportedFileType> for SupportedFileType {
fn from(value: malwaredb_client::malwaredb_api::SupportedFileType) -> Self {
Self {
name: value.name,
magic: value.magic,
is_executable: value.is_executable,
description: value.description,
}
}
}
#[pyclass(frozen)]
#[derive(Debug)]
pub struct UserInfo {
#[pyo3(get)]
pub id: u32,
#[pyo3(get)]
pub username: String,
#[pyo3(get)]
pub groups: Vec<String>,
#[pyo3(get)]
pub sources: Vec<String>,
#[pyo3(get)]
pub is_admin: bool,
#[pyo3(get)]
pub created: String,
#[pyo3(get)]
pub is_readonly: bool,
}
#[pymethods]
impl UserInfo {
#[must_use]
pub fn __str__(&self) -> String {
self.username.clone()
}
}
impl From<malwaredb_client::malwaredb_api::GetUserInfoResponse> for UserInfo {
fn from(value: malwaredb_client::malwaredb_api::GetUserInfoResponse) -> Self {
Self {
id: value.id,
username: value.username,
groups: value.groups,
sources: value.sources,
is_admin: value.is_admin,
created: value.created.to_rfc3339(),
is_readonly: value.is_readonly,
}
}
}
#[pyclass(frozen)]
#[derive(Debug)]
pub struct SearchResults {
#[pyo3(get)]
pub hashes: Vec<String>,
#[pyo3(get)]
pub results: u64,
#[pyo3(get)]
pub pagination: Option<Uuid>,
#[pyo3(get)]
pub message: Option<String>,
}
#[pymethods]
impl SearchResults {
#[must_use]
pub fn __repr__(&self) -> String {
if self.pagination.is_some() && self.results > self.hashes.len() as u64 {
format!(
"Search results showing {} of {} available hashes",
self.hashes.len(),
self.results
)
} else if let Some(message) = &self.message {
format!(
"{message}: Search results showing {} hashes",
self.hashes.len()
)
} else {
format!("Search results showing {} hashes", self.hashes.len())
}
}
#[must_use]
pub fn __str__(&self) -> String {
if let Some(message) = &self.message {
format!(
"{message}: Search results with {} hashes",
self.hashes.len()
)
} else {
format!("Search results with {} hashes", self.hashes.len())
}
}
}
impl From<malwaredb_client::malwaredb_api::SearchResponse> for SearchResults {
fn from(value: malwaredb_client::malwaredb_api::SearchResponse) -> Self {
Self {
hashes: value.hashes,
results: value.total_results,
pagination: value.pagination,
message: value.message,
}
}
}
#[pyclass(frozen)]
#[derive(Debug)]
pub struct YaraResult {
#[pyo3(get)]
pub results: HashMap<String, Vec<String>>,
}
impl YaraResult {
#[must_use]
pub fn __len__(&self) -> usize {
self.results.len()
}
}
impl From<malwaredb_client::malwaredb_api::YaraSearchResponse> for YaraResult {
fn from(value: malwaredb_client::malwaredb_api::YaraSearchResponse) -> Self {
Self {
results: value
.results
.into_iter()
.map(|(k, v)| (k, v.into_iter().map(|h| h.to_string()).collect()))
.collect(),
}
}
}
#[pyclass(frozen, from_py_object)]
#[derive(Debug, Clone)]
pub struct DiscoveredServer {
#[pyo3(get)]
pub host: String,
#[pyo3(get)]
pub port: u16,
#[pyo3(get)]
pub ssl: bool,
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub info: Option<ServerInfo>,
}
#[pymethods]
impl DiscoveredServer {
#[pyo3(signature = (username, password, save = false, cert_path = None))]
pub fn connect(
&self,
username: String,
password: String,
save: bool,
cert_path: Option<PathBuf>,
) -> PyResult<crate::MalwareDBClient> {
crate::MalwareDBClient::login(self.__str__(), username, password, save, cert_path)
}
#[must_use]
pub fn __repr__(&self) -> String {
if self.ssl {
format!("{} at https://{}:{}", self.name, self.host, self.port)
} else {
format!("{} at http://{}:{}", self.name, self.host, self.port)
}
}
#[must_use]
pub fn __str__(&self) -> String {
if self.ssl {
format!("https://{}:{}", self.host, self.port)
} else {
format!("http://{}:{}", self.host, self.port)
}
}
}
impl From<malwaredb_client::MalwareDBServer> for DiscoveredServer {
fn from(value: malwaredb_client::MalwareDBServer) -> Self {
let info = value.server_info_blocking().ok();
Self {
host: value.host,
port: value.port,
ssl: value.ssl,
name: value.name,
info: info.map(Into::into),
}
}
}