arche 3.0.0

An opinionated backend foundation for Axum applications, providing batteries-included integrations for cloud services, databases, authentication, middleware, and logging.
Documentation
use std::collections::HashMap;

use serde::de::DeserializeOwned;
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;

use crate::error::AppError;

pub struct JsonArrayStream {
    receiver: mpsc::Receiver<Result<Value, AppError>>,
    metadata: HashMap<String, Value>,
    _handle: JoinHandle<()>,
}

impl JsonArrayStream {
    pub(crate) fn new(
        receiver: mpsc::Receiver<Result<Value, AppError>>,
        metadata: HashMap<String, Value>,
        handle: JoinHandle<()>,
    ) -> Self {
        Self {
            receiver,
            metadata,
            _handle: handle,
        }
    }

    pub fn field<T: DeserializeOwned>(&self, name: &str) -> Result<T, AppError> {
        let value = self.metadata.get(name).ok_or_else(|| {
            AppError::internal_error(format!("JSON field not found: {name}"), None)
        })?;
        serde_json::from_value(value.clone())
            .map_err(|e| AppError::internal_error(format!("JSON field type mismatch: {e}"), None))
    }

    pub fn metadata(&self) -> &HashMap<String, Value> {
        &self.metadata
    }

    pub async fn next<T: DeserializeOwned>(&mut self) -> Option<Result<T, AppError>> {
        match self.receiver.recv().await {
            Some(Ok(value)) => Some(serde_json::from_value(value).map_err(|e| {
                AppError::internal_error(format!("JSON deserialization error: {e}"), None)
            })),
            Some(Err(e)) => Some(Err(e)),
            None => None,
        }
    }

    pub async fn next_batch<T: DeserializeOwned>(
        &mut self,
        batch_size: usize,
    ) -> Option<Result<Vec<T>, AppError>> {
        let mut batch = Vec::with_capacity(batch_size);
        for _ in 0..batch_size {
            match self.next::<T>().await {
                Some(Ok(item)) => batch.push(item),
                Some(Err(e)) => return Some(Err(e)),
                None => break,
            }
        }
        if batch.is_empty() {
            None
        } else {
            Some(Ok(batch))
        }
    }
}