probabilistic-rs 0.6.4

Probabilistic data structures in Rust
Documentation
use axum::{
    Json, Router,
    extract::{Path, State},
    routing::{delete, get, post},
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
use utoipa::ToSchema;

use crate::ebloom::{
    BulkExpiringBloomFilterOps, ExpiringBloomFilter, ExpiringBloomFilterOps,
    ExpiringBloomFilterStats, ExpiringFilterConfigBuilder,
};
use crate::server::error::{ApiError, ApiResult};
use crate::server::state::AppState;

#[derive(Debug, Deserialize, ToSchema)]
pub struct CreateEbloomRequest {
    pub name: String,
    #[schema(default = 1_000_000)]
    pub capacity: usize,
    #[schema(default = 0.01)]
    pub fpr: f64,
    #[schema(default = 3600)]
    pub ttl_secs: u64,
    #[schema(default = 3)]
    pub num_levels: usize,
}

#[derive(Debug, Deserialize, ToSchema)]
pub struct ItemRequest {
    pub item: String,
}

#[derive(Debug, Deserialize, ToSchema)]
pub struct BulkItemRequest {
    pub items: Vec<String>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct ContainsResponse {
    pub present: bool,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct BulkContainsResponse {
    pub results: Vec<bool>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct EbloomStatsResponse {
    pub capacity_per_level: usize,
    pub target_fpr: f64,
    pub total_insert_count: u64,
    pub active_levels: usize,
    pub num_levels: usize,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct FilterListResponse {
    pub filters: Vec<String>,
}

#[derive(Debug, Serialize, ToSchema)]
pub struct MessageResponse {
    pub message: String,
}

pub fn router() -> Router<AppState> {
    Router::new()
        .route("/", post(create_filter))
        .route("/list", get(list_filters))
        .route("/{name}", delete(delete_filter))
        .route("/{name}/insert", post(insert))
        .route("/{name}/contains", post(contains))
        .route("/{name}/bulk/insert", post(bulk_insert))
        .route("/{name}/bulk/contains", post(bulk_contains))
        .route("/{name}/clear", post(clear))
        .route("/{name}/stats", get(stats))
}

#[utoipa::path(
    post,
    path = "/api/v1/ebloom",
    request_body = CreateEbloomRequest,
    responses(
        (status = 201, description = "Filter created", body = MessageResponse),
        (status = 409, description = "Filter already exists", body = ApiError),
        (status = 500, description = "Internal error", body = ApiError)
    )
)]
pub async fn create_filter(
    State(state): State<AppState>,
    Json(req): Json<CreateEbloomRequest>,
) -> ApiResult<Json<MessageResponse>> {
    let mut eblooms = state.eblooms.write().await;

    if eblooms.contains_key(&req.name) {
        return Err(ApiError::already_exists("Expiring bloom filter", &req.name));
    }

    let config = ExpiringFilterConfigBuilder::default()
        .capacity_per_level(req.capacity)
        .target_fpr(req.fpr)
        .level_duration(Duration::from_secs(req.ttl_secs / req.num_levels as u64))
        .num_levels(req.num_levels)
        .build()
        .map_err(|e| ApiError::with_details("Invalid config", e.to_string()))?;

    let filter = ExpiringBloomFilter::create(config)
        .await
        .map_err(ApiError::from)?;
    eblooms.insert(req.name.clone(), Arc::new(filter));

    Ok(Json(MessageResponse {
        message: format!("Expiring bloom filter '{}' created", req.name),
    }))
}

#[utoipa::path(
    delete,
    path = "/api/v1/ebloom/{name}",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    responses(
        (status = 200, description = "Filter deleted", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn delete_filter(
    State(state): State<AppState>,
    Path(name): Path<String>,
) -> ApiResult<Json<MessageResponse>> {
    let mut eblooms = state.eblooms.write().await;

    if eblooms.remove(&name).is_none() {
        return Err(ApiError::not_found("Expiring bloom filter", &name));
    }

    Ok(Json(MessageResponse {
        message: format!("Expiring bloom filter '{}' deleted", name),
    }))
}

#[utoipa::path(
    post,
    path = "/api/v1/ebloom/{name}/insert",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = ItemRequest,
    responses(
        (status = 200, description = "Item inserted", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn insert(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<ItemRequest>,
) -> ApiResult<Json<MessageResponse>> {
    let eblooms = state.eblooms.read().await;

    let filter = eblooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Expiring bloom filter", &name))?;

    filter.insert(req.item.as_bytes()).map_err(ApiError::from)?;

    Ok(Json(MessageResponse {
        message: "Item inserted".to_string(),
    }))
}

#[utoipa::path(
    post,
    path = "/api/v1/ebloom/{name}/contains",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = ItemRequest,
    responses(
        (status = 200, description = "Check result", body = ContainsResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn contains(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<ItemRequest>,
) -> ApiResult<Json<ContainsResponse>> {
    let eblooms = state.eblooms.read().await;

    let filter = eblooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Expiring bloom filter", &name))?;

    let present = filter
        .contains(req.item.as_bytes())
        .map_err(ApiError::from)?;

    Ok(Json(ContainsResponse { present }))
}

#[utoipa::path(
    post,
    path = "/api/v1/ebloom/{name}/bulk/insert",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = BulkItemRequest,
    responses(
        (status = 200, description = "Items inserted", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn bulk_insert(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<BulkItemRequest>,
) -> ApiResult<Json<MessageResponse>> {
    let eblooms = state.eblooms.read().await;

    let filter = eblooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Expiring bloom filter", &name))?;

    let items: Vec<&[u8]> = req.items.iter().map(|s| s.as_bytes()).collect();
    filter.insert_bulk(&items).map_err(ApiError::from)?;

    Ok(Json(MessageResponse {
        message: format!("{} items inserted", req.items.len()),
    }))
}

#[utoipa::path(
    post,
    path = "/api/v1/ebloom/{name}/bulk/contains",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    request_body = BulkItemRequest,
    responses(
        (status = 200, description = "Check results", body = BulkContainsResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn bulk_contains(
    State(state): State<AppState>,
    Path(name): Path<String>,
    Json(req): Json<BulkItemRequest>,
) -> ApiResult<Json<BulkContainsResponse>> {
    let eblooms = state.eblooms.read().await;

    let filter = eblooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Expiring bloom filter", &name))?;

    let items: Vec<&[u8]> = req.items.iter().map(|s| s.as_bytes()).collect();
    let results = filter.contains_bulk(&items).map_err(ApiError::from)?;

    Ok(Json(BulkContainsResponse { results }))
}

#[utoipa::path(
    post,
    path = "/api/v1/ebloom/{name}/clear",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    responses(
        (status = 200, description = "Filter cleared", body = MessageResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn clear(
    State(state): State<AppState>,
    Path(name): Path<String>,
) -> ApiResult<Json<MessageResponse>> {
    let eblooms = state.eblooms.read().await;

    let filter = eblooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Expiring bloom filter", &name))?;

    filter.clear().map_err(ApiError::from)?;

    Ok(Json(MessageResponse {
        message: format!("Expiring bloom filter '{}' cleared", name),
    }))
}

#[utoipa::path(
    get,
    path = "/api/v1/ebloom/{name}/stats",
    params(
        ("name" = String, Path, description = "Filter name")
    ),
    responses(
        (status = 200, description = "Filter statistics", body = EbloomStatsResponse),
        (status = 404, description = "Filter not found", body = ApiError)
    )
)]
pub async fn stats(
    State(state): State<AppState>,
    Path(name): Path<String>,
) -> ApiResult<Json<EbloomStatsResponse>> {
    let eblooms = state.eblooms.read().await;

    let filter = eblooms
        .get(&name)
        .ok_or_else(|| ApiError::not_found("Expiring bloom filter", &name))?;

    Ok(Json(EbloomStatsResponse {
        capacity_per_level: filter.capacity_per_level(),
        target_fpr: filter.target_fpr(),
        total_insert_count: filter.total_insert_count(),
        active_levels: filter.active_levels(),
        num_levels: filter.num_levels(),
    }))
}

#[utoipa::path(
    get,
    path = "/api/v1/ebloom/list",
    responses(
        (status = 200, description = "List of filters", body = FilterListResponse)
    )
)]
pub async fn list_filters(
    State(state): State<AppState>,
) -> Json<FilterListResponse> {
    let eblooms = state.eblooms.read().await;
    let filters: Vec<String> = eblooms.keys().cloned().collect();
    Json(FilterListResponse { filters })
}