arche 3.0.0

An opinionated backend foundation for Axum applications, providing batteries-included integrations for cloud services, databases, authentication, middleware, and logging.
Documentation
use std::collections::HashMap;
use std::fmt;
use std::mem;

use serde::de::{self, DeserializeSeed, MapAccess, SeqAccess, Visitor};
use serde_json::Value;
use tokio::sync::{mpsc, oneshot};

use crate::error::AppError;

pub(crate) struct ObjectStreamVisitor {
    pub target_field: String,
    pub metadata_tx: Option<oneshot::Sender<HashMap<String, Value>>>,
    pub element_tx: mpsc::Sender<Result<Value, AppError>>,
}

impl<'de> Visitor<'de> for ObjectStreamVisitor {
    type Value = ();

    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        formatter.write_str("a JSON object")
    }

    fn visit_map<M>(self, mut map: M) -> Result<(), M::Error>
    where
        M: MapAccess<'de>,
    {
        let mut metadata = HashMap::new();
        let mut metadata_tx = self.metadata_tx;
        let element_tx = self.element_tx;
        let target_field = self.target_field;

        while let Some(key) = map.next_key::<String>()? {
            if key == target_field {
                if let Some(tx) = metadata_tx.take() {
                    let _ = tx.send(mem::take(&mut metadata));
                }
                map.next_value_seed(ArrayStreamSeed {
                    element_tx: &element_tx,
                })?;
            } else {
                let value: Value = map.next_value()?;
                metadata.insert(key, value);
            }
        }

        if let Some(tx) = metadata_tx {
            let _ = tx.send(metadata);
        }

        Ok(())
    }
}

pub(crate) struct ArrayStreamSeed<'a> {
    pub element_tx: &'a mpsc::Sender<Result<Value, AppError>>,
}

impl<'de, 'a> DeserializeSeed<'de> for ArrayStreamSeed<'a> {
    type Value = ();

    fn deserialize<D>(self, deserializer: D) -> Result<(), D::Error>
    where
        D: de::Deserializer<'de>,
    {
        deserializer.deserialize_seq(self)
    }
}

impl<'de, 'a> Visitor<'de> for ArrayStreamSeed<'a> {
    type Value = ();

    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
        formatter.write_str("a JSON array")
    }

    fn visit_seq<S>(self, mut seq: S) -> Result<(), S::Error>
    where
        S: SeqAccess<'de>,
    {
        while let Some(value) = seq.next_element::<Value>()? {
            if self.element_tx.blocking_send(Ok(value)).is_err() {
                break;
            }
        }
        Ok(())
    }
}