probabilistic-rs 0.6.4

Probabilistic data structures in Rust
Documentation
use axum::{
    Json, Router,
    extract::{Path, State},
    routing::{delete, get, post},
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use utoipa::ToSchema;

use crate::bloom::{
    BloomFilter, BloomFilterConfigBuilder, BloomFilterOps, BloomFilterStats,
    BulkBloomFilterOps,
};
use crate::server::error::{ApiError, ApiResult};
use crate::server::state::AppState;

#[derive(Debug, Deserialize, ToSchema)]
pub struct CreateBloomRequest {
    pub name: String,
    #[schema(default = 1_000_000)]
    pub capacity: usize,
    #[schema(default = 0.01)]
    pub fpr: f64,
}

#[derive(Debug, Deserialize, ToSchema)]
pub struct ItemRequest {
    pub item: String,
}

#[derive(Debug, Deserialize, ToSchema)]
pub struct BulkItemRequest {
    pub items: Vec<String>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct ContainsResponse {
    pub present: bool,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct BulkContainsResponse {
    pub results: Vec<bool>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct BloomStatsResponse {
    pub capacity: usize,
    pub false_positive_rate: f64,
    pub insert_count: usize,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct FilterListResponse {
    pub filters: Vec<String>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct MessageResponse {
    pub message: String,
}

pub fn router() -> Router<AppState> {
    Router::new()
        .route("/", post(create_filter))
        .route("/list", get(list_filters))
        .route("/{name}", delete(delete_filter))
        .route("/{name}/insert", post(insert))
        .route("/{name}/contains", post(contains))
        .route("/{name}/bulk/insert", post(bulk_insert))
        .route("/{name}/bulk/contains", post(bulk_contains))
        .route("/{name}/clear", post(clear))
        .route("/{name}/stats", get(stats))
}

#[utoipa::path(
    post,
    path = "/api/v1/bloom",
    request_body = CreateBloomRequest,
    responses(
        (status = 201, description = "Filter created", body = MessageResponse),
        (status = 409, description = "Filter already exists", body = ApiError),
        (status = 500, description = "Internal error", body = ApiError)
    )
)]
pub async fn create_filter(
    State(state): State<AppState>,
    Json(req): Json<CreateBloomRequest>,
) -> ApiResult<Json<MessageResponse>> {
    let mut blooms = state.blooms.write().await;

    if blooms.contains_key(&req.name) {
        return Err(ApiError::already_exists("Bloom filter", &req.name));
    }

    let config = BloomFilterConfigBuilder::default()
        .capacity(req.capacity)
        .false_positive_rate(req.fpr)
        .build()
        .map_err(|e| ApiError::with_details("Invalid config", e.to_string()))?;

    let filter = BloomFilter::create(config).await.map_err(ApiError::from)?;
    blooms.insert(req.name.clone(), Arc::new(filter));

    Ok(Json(MessageResponse {
        message: format!("Bloom filter '{}' created", req.name),
    }))
}

#[utoipa::path(
    delete,
    path = "/api/v1/bloom/{name}",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    responses(
        (status = 200, description = "Filter deleted", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn delete_filter(
    State(state): State<AppState>,
    Path(name): Path<String>,
) -> ApiResult<Json<MessageResponse>> {
    let mut blooms = state.blooms.write().await;

    if blooms.remove(&name).is_none() {
        return Err(ApiError::not_found("Bloom filter", &name));
    }

    Ok(Json(MessageResponse {
        message: format!("Bloom filter '{}' deleted", name),
    }))
}

#[utoipa::path(
    post,
    path = "/api/v1/bloom/{name}/insert",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = ItemRequest,
    responses(
        (status = 200, description = "Item inserted", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn insert(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<ItemRequest>,
) -> ApiResult<Json<MessageResponse>> {
    let blooms = state.blooms.read().await;

    let filter = blooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Bloom filter", &name))?;

    filter.insert(req.item.as_bytes()).map_err(ApiError::from)?;

    Ok(Json(MessageResponse {
        message: "Item inserted".to_string(),
    }))
}

#[utoipa::path(
    post,
    path = "/api/v1/bloom/{name}/contains",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = ItemRequest,
    responses(
        (status = 200, description = "Check result", body = ContainsResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn contains(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<ItemRequest>,
) -> ApiResult<Json<ContainsResponse>> {
    let blooms = state.blooms.read().await;

    let filter = blooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Bloom filter", &name))?;

    let present = filter
        .contains(req.item.as_bytes())
        .map_err(ApiError::from)?;

    Ok(Json(ContainsResponse { present }))
}

#[utoipa::path(
    post,
    path = "/api/v1/bloom/{name}/bulk/insert",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = BulkItemRequest,
    responses(
        (status = 200, description = "Items inserted", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn bulk_insert(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<BulkItemRequest>,
) -> ApiResult<Json<MessageResponse>> {
    let blooms = state.blooms.read().await;

    let filter = blooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Bloom filter", &name))?;

    let items: Vec<&[u8]> = req.items.iter().map(|s| s.as_bytes()).collect();
    filter.insert_bulk(&items).map_err(ApiError::from)?;

    Ok(Json(MessageResponse {
        message: format!("{} items inserted", req.items.len()),
    }))
}

#[utoipa::path(
    post,
    path = "/api/v1/bloom/{name}/bulk/contains",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = BulkItemRequest,
    responses(
        (status = 200, description = "Check results", body = BulkContainsResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn bulk_contains(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<BulkItemRequest>,
) -> ApiResult<Json<BulkContainsResponse>> {
    let blooms = state.blooms.read().await;

    let filter = blooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Bloom filter", &name))?;

    let items: Vec<&[u8]> = req.items.iter().map(|s| s.as_bytes()).collect();
    let results = filter.contains_bulk(&items).map_err(ApiError::from)?;

    Ok(Json(BulkContainsResponse { results }))
}

#[utoipa::path(
    post,
    path = "/api/v1/bloom/{name}/clear",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    responses(
        (status = 200, description = "Filter cleared", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn clear(
    State(state): State<AppState>,
    Path(name): Path<String>,
) -> ApiResult<Json<MessageResponse>> {
    let blooms = state.blooms.read().await;

    let filter = blooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Bloom filter", &name))?;

    filter.clear().map_err(ApiError::from)?;

    Ok(Json(MessageResponse {
        message: format!("Bloom filter '{}' cleared", name),
    }))
}

#[utoipa::path(
    get,
    path = "/api/v1/bloom/{name}/stats",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    responses(
        (status = 200, description = "Filter statistics", body = BloomStatsResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn stats(
    State(state): State<AppState>,
    Path(name): Path<String>,
) -> ApiResult<Json<BloomStatsResponse>> {
    let blooms = state.blooms.read().await;

    let filter = blooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Bloom filter", &name))?;

    Ok(Json(BloomStatsResponse {
        capacity: filter.capacity(),
        false_positive_rate: filter.false_positive_rate(),
        insert_count: filter.insert_count(),
    }))
}

#[utoipa::path(
    get,
    path = "/api/v1/bloom/list",
    responses(
        (status = 200, description = "List of filters", body = FilterListResponse)
    )
)]
pub async fn list_filters(
    State(state): State<AppState>,
) -> Json<FilterListResponse> {
    let blooms = state.blooms.read().await;
    let filters: Vec<String> = blooms.keys().cloned().collect();
    Json(FilterListResponse { filters })
}