malwaredb-client-py 0.3.3

Python client for MalwareDB.
Documentation
// SPDX-License-Identifier: Apache-2.0

#![doc = include_str!("../README.md")]
#![deny(clippy::all)]
#![deny(clippy::pedantic)]
#![forbid(unsafe_code)]

/// `CaRT` file I/O
pub mod cart;

/// Python wrapper types for some Malware DB API types
pub mod types;

use crate::types::{
    DiscoveredServer, Label, SearchResults, ServerInfo, Source, SupportedFileType, UserInfo,
    YaraResult,
};
use malwaredb_client::blocking::MdbClient;

use std::borrow::Cow;
use std::path::PathBuf;

use anyhow::{anyhow, Result};
use pyo3::prelude::*;
use uuid::Uuid;

/// MDB version
pub const MDB_VERSION: &str = env!("CARGO_PKG_VERSION");

pub const VERSION: &str = concat!(env!("MDB_VERSION"), " ", env!("MDB_BUILD_DATE"));

/// Malware DB client
#[pyclass(frozen)]
pub struct MalwareDBClient {
    inner: MdbClient,
}

#[pymethods]
impl MalwareDBClient {
    /// Load a configuration from a file if it can be found
    ///
    /// # Errors
    ///
    /// Returns an error if the configuration file can't be found or isn't valid.
    #[new]
    pub fn new() -> PyResult<Self> {
        Ok(MalwareDBClient {
            inner: MdbClient::load()?,
        })
    }

    /// Login with a username and password
    ///
    /// # Errors
    ///
    /// Returns an error if the server URL, username, or password were incorrect, or if the server
    /// was unreachable.
    #[staticmethod]
    pub fn login(
        url: String,
        username: String,
        password: String,
        save: bool,
        cert_path: Option<PathBuf>,
    ) -> PyResult<Self> {
        Ok(MalwareDBClient {
            inner: MdbClient::login(url, username, password, save, cert_path)?,
        })
    }

    /// Connect if an API key is already known
    ///
    /// # Errors
    ///
    /// Returns an error if the certificate was not in the expected DER or PEM format or could not be parsed.
    #[staticmethod]
    pub fn connect(url: String, api_key: String, cert_path: Option<PathBuf>) -> PyResult<Self> {
        Ok(MalwareDBClient {
            inner: MdbClient::new(url, api_key, cert_path)?,
        })
    }

    /// Use multicast DNS (also known as Bonjour, Zeroconf) to discover Malware DB servers on your network
    ///
    /// # Errors
    ///
    /// A network problem could result in an error.
    #[staticmethod]
    pub fn discover() -> Result<Vec<DiscoveredServer>> {
        malwaredb_client::discover_servers().map(|s| s.into_iter().map(Into::into).collect())
    }

    /// Connect using a specific configuration file
    ///
    /// # Errors
    ///
    /// Returns an error if the configuration file cannot be read, possibly because it
    /// doesn't exist or due to a permission error or a parsing error.
    #[staticmethod]
    pub fn from_file(path: PathBuf) -> Result<Self> {
        Ok(MalwareDBClient {
            inner: MdbClient::from_file(path)?,
        })
    }

    /// Get the server's URL
    #[getter]
    #[must_use]
    pub fn url(&self) -> String {
        self.inner.url.clone()
    }

    /// Get the bytes of a sample from the database, optionally as a `CaRT` file.
    ///
    /// # Errors
    ///
    /// This may return an error if the server is unreachable or if the user is not logged in
    /// or not properly authorized to connect.
    #[pyo3(signature = (hash, cart = false))]
    pub fn get_file_bytes(&self, hash: &str, cart: bool) -> Result<Cow<'_, [u8]>> {
        self.inner.retrieve(hash, cart).map(Cow::from)
    }

    /// Submit a file to the database, which requires the file name and source ID. Returns true if stored.
    ///
    /// # Errors
    ///
    /// This may return an error if the server is unreachable or if the user is not logged in
    /// or not properly authorized to connect.
    pub fn submit_file(
        &self,
        contents: Vec<u8>,
        file_name: String,
        source_id: u32,
    ) -> Result<bool> {
        self.inner.submit(contents, file_name, source_id)
    }

    /// Search by partial hash, partial file name, file type, file command output, or labels;
    /// returning a list of hashes by specified hash type
    ///
    /// # Errors
    ///
    /// * Invalid hash types will result in an error
    /// * This may return an error if the server isn't reachable or if the user is not logged in
    /// or the request isn't valid
    #[allow(clippy::too_many_arguments)]
    #[pyo3(signature = (hash = None, hash_type = "sha256", file_name = None, labels = None, file_type = None, magic = None, response_hash = "sha256", limit = 100))]
    pub fn search(
        &self,
        hash: Option<String>,
        hash_type: &str,
        file_name: Option<String>,
        labels: Option<Vec<String>>,
        file_type: Option<String>,
        magic: Option<String>,
        response_hash: &str,
        limit: u32,
    ) -> Result<SearchResults> {
        let hash_type = hash_type.try_into().map_err(|e: String| anyhow!(e))?;
        let response_hash = response_hash.try_into().map_err(|e: String| anyhow!(e))?;
        self.inner
            .partial_search_labels_type(
                hash.map(|h| (hash_type, h)),
                file_name,
                response_hash,
                labels,
                file_type,
                magic,
                limit,
            )
            .map(Into::into)
    }

    /// Get sources available to the user
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable or if the user is not logged in.
    pub fn get_sources(&self) -> Result<Vec<Source>> {
        let sources = self
            .inner
            .sources()?
            .sources
            .into_iter()
            .map(Into::into)
            .collect();
        Ok(sources)
    }

    /// Get information about the server
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable.
    pub fn server_info(&self) -> Result<ServerInfo> {
        Ok(self.inner.server_info()?.into())
    }

    /// Get supported file types; Malware DB only accepts file types it knows about
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable or the user is not logged in.
    pub fn get_supported_file_types(&self) -> Result<Vec<SupportedFileType>> {
        let supported_types = self
            .inner
            .supported_types()?
            .types
            .into_iter()
            .map(Into::into)
            .collect();
        Ok(supported_types)
    }

    /// Get information about the user
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable or if the user is not logged in
    /// or not properly authorized to connect.
    pub fn whoami(&self) -> Result<UserInfo> {
        self.inner.whoami().map(Into::into)
    }

    /// Get labels
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable or if the user is not logged in
    /// or not properly authorized to connect.
    pub fn labels(&self) -> Result<Vec<Label>> {
        self.inner
            .labels()
            .map(|labels| labels.0.into_iter().map(Into::into).collect())
    }

    /// Yara rule search
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable or if the user is not logged in
    /// or not properly authorized to connect.
    pub fn yara_search(&self, query: &str) -> Result<Uuid> {
        Ok(self.inner.yara_search(query)?.uuid)
    }

    /// Yara search result via UUID
    ///
    /// # Errors
    ///
    /// This may return an error if the server isn't reachable or if the user is not logged in
    /// or not properly authorized to connect.
    pub fn yara_result(&self, uuid: Uuid) -> Result<YaraResult> {
        self.inner.yara_result(uuid).map(Into::into)
    }
}

/// Only used by this crate directly to register the module. If this crate is used as a module,
/// that other crate must register the Rust types with that new Python module.
#[cfg(not(feature = "rust_lib"))]
#[pymodule]
fn malwaredb(m: &Bound<'_, PyModule>) -> PyResult<()> {
    if let Err(log_error) = pyo3_log::try_init() {
        eprintln!("Failed to enable logging: {log_error}");
    }

    m.add_class::<MalwareDBClient>()?;
    m.add_class::<Label>()?;
    m.add_class::<ServerInfo>()?;
    m.add_class::<Source>()?;
    m.add_class::<SupportedFileType>()?;
    m.add_class::<UserInfo>()?;
    cart::register_cart_module(m)?;
    m.add("__version__", MDB_VERSION)?;
    m.add("full_version", VERSION)?;
    Ok(())
}