#![doc = include_str!("../README.md")]
#![deny(clippy::all)]
#![deny(clippy::pedantic)]
#![forbid(unsafe_code)]
pub mod cart;
pub mod types;
use crate::types::{
DiscoveredServer, Label, SearchResults, ServerInfo, Source, SupportedFileType, UserInfo,
YaraResult,
};
use malwaredb_client::blocking::MdbClient;
use std::borrow::Cow;
use std::path::PathBuf;
use anyhow::{anyhow, Result};
use pyo3::prelude::*;
use uuid::Uuid;
pub const MDB_VERSION: &str = env!("CARGO_PKG_VERSION");
pub const VERSION: &str = concat!(env!("MDB_VERSION"), " ", env!("MDB_BUILD_DATE"));
#[pyclass(frozen)]
pub struct MalwareDBClient {
inner: MdbClient,
}
#[pymethods]
impl MalwareDBClient {
#[new]
pub fn new() -> PyResult<Self> {
Ok(MalwareDBClient {
inner: MdbClient::load()?,
})
}
#[staticmethod]
pub fn login(
url: String,
username: String,
password: String,
save: bool,
cert_path: Option<PathBuf>,
) -> PyResult<Self> {
Ok(MalwareDBClient {
inner: MdbClient::login(url, username, password, save, cert_path)?,
})
}
#[staticmethod]
pub fn connect(url: String, api_key: String, cert_path: Option<PathBuf>) -> PyResult<Self> {
Ok(MalwareDBClient {
inner: MdbClient::new(url, api_key, cert_path)?,
})
}
#[staticmethod]
pub fn discover() -> Result<Vec<DiscoveredServer>> {
malwaredb_client::discover_servers().map(|s| s.into_iter().map(Into::into).collect())
}
#[staticmethod]
pub fn from_file(path: PathBuf) -> Result<Self> {
Ok(MalwareDBClient {
inner: MdbClient::from_file(path)?,
})
}
#[getter]
#[must_use]
pub fn url(&self) -> String {
self.inner.url.clone()
}
#[pyo3(signature = (hash, cart = false))]
pub fn get_file_bytes(&self, hash: &str, cart: bool) -> Result<Cow<'_, [u8]>> {
self.inner.retrieve(hash, cart).map(Cow::from)
}
pub fn submit_file(
&self,
contents: Vec<u8>,
file_name: String,
source_id: u32,
) -> Result<bool> {
self.inner.submit(contents, file_name, source_id)
}
#[allow(clippy::too_many_arguments)]
#[pyo3(signature = (hash = None, hash_type = "sha256", file_name = None, labels = None, file_type = None, magic = None, response_hash = "sha256", limit = 100))]
pub fn search(
&self,
hash: Option<String>,
hash_type: &str,
file_name: Option<String>,
labels: Option<Vec<String>>,
file_type: Option<String>,
magic: Option<String>,
response_hash: &str,
limit: u32,
) -> Result<SearchResults> {
let hash_type = hash_type.try_into().map_err(|e: String| anyhow!(e))?;
let response_hash = response_hash.try_into().map_err(|e: String| anyhow!(e))?;
self.inner
.partial_search_labels_type(
hash.map(|h| (hash_type, h)),
file_name,
response_hash,
labels,
file_type,
magic,
limit,
)
.map(Into::into)
}
pub fn get_sources(&self) -> Result<Vec<Source>> {
let sources = self
.inner
.sources()?
.sources
.into_iter()
.map(Into::into)
.collect();
Ok(sources)
}
pub fn server_info(&self) -> Result<ServerInfo> {
Ok(self.inner.server_info()?.into())
}
pub fn get_supported_file_types(&self) -> Result<Vec<SupportedFileType>> {
let supported_types = self
.inner
.supported_types()?
.types
.into_iter()
.map(Into::into)
.collect();
Ok(supported_types)
}
pub fn whoami(&self) -> Result<UserInfo> {
self.inner.whoami().map(Into::into)
}
pub fn labels(&self) -> Result<Vec<Label>> {
self.inner
.labels()
.map(|labels| labels.0.into_iter().map(Into::into).collect())
}
pub fn yara_search(&self, query: &str) -> Result<Uuid> {
Ok(self.inner.yara_search(query)?.uuid)
}
pub fn yara_result(&self, uuid: Uuid) -> Result<YaraResult> {
self.inner.yara_result(uuid).map(Into::into)
}
}
#[cfg(not(feature = "rust_lib"))]
#[pymodule]
fn malwaredb(m: &Bound<'_, PyModule>) -> PyResult<()> {
if let Err(log_error) = pyo3_log::try_init() {
eprintln!("Failed to enable logging: {log_error}");
}
m.add_class::<MalwareDBClient>()?;
m.add_class::<Label>()?;
m.add_class::<ServerInfo>()?;
m.add_class::<Source>()?;
m.add_class::<SupportedFileType>()?;
m.add_class::<UserInfo>()?;
cart::register_cart_module(m)?;
m.add("__version__", MDB_VERSION)?;
m.add("full_version", VERSION)?;
Ok(())
}