link-common 0.5.2-rc.2

Shared Rust implementation for KalamDB link crates
Documentation
use base64::{engine::general_purpose::STANDARD, Engine};
use serde::Deserialize;

use crate::{
    consumer::models::{AckResponse, ConsumeMessage, ConsumeResponse, TopicOp},
    models::{RowData, UserId},
};

#[derive(Debug, Clone, Default, Deserialize)]
pub struct ConsumeRequestContext {
    #[serde(default)]
    pub topic_id: String,
    #[serde(default)]
    pub group_id: String,
    #[serde(default)]
    pub partition_id: u32,
}

#[derive(Debug, Clone, Copy, Default, Deserialize)]
pub struct AckRequestContext {
    #[serde(default)]
    pub upto_offset: u64,
}

#[derive(Debug, Deserialize)]
struct RawAckResponse {
    success: Option<bool>,
    acknowledged_offset: Option<u64>,
}

#[derive(Debug, Deserialize)]
struct RawConsumeResponse {
    #[serde(default)]
    messages: Vec<RawConsumeMessage>,
    #[serde(default)]
    next_offset: u64,
    #[serde(default)]
    has_more: bool,
}

#[derive(Debug, Deserialize)]
struct RawConsumeMessage {
    #[serde(default, alias = "message_id")]
    key: Option<String>,
    #[serde(default)]
    op: Option<TopicOp>,
    #[serde(default, rename = "timestamp_ms", alias = "ts")]
    timestamp_ms: Option<u64>,
    #[serde(default)]
    offset: u64,
    partition_id: Option<u32>,
    topic_id: Option<String>,
    #[serde(default, alias = "username", alias = "user_id")]
    user: Option<UserId>,
    #[serde(default, alias = "value")]
    payload: Option<serde_json::Value>,
}

pub fn decode_consume_response(
    text: &str,
    request_context: &ConsumeRequestContext,
) -> Result<ConsumeResponse, String> {
    let raw: RawConsumeResponse = serde_json::from_str(text)
        .map_err(|error| format!("Failed to parse consume response: {}", error))?;
    let messages = raw
        .messages
        .into_iter()
        .map(|message| decode_consume_message(message, request_context))
        .collect::<Result<Vec<_>, _>>()?;

    Ok(ConsumeResponse {
        messages,
        next_offset: raw.next_offset,
        has_more: raw.has_more,
    })
}

pub fn decode_ack_response(
    text: &str,
    request_context: AckRequestContext,
) -> Result<AckResponse, String> {
    let raw: RawAckResponse = serde_json::from_str(text)
        .map_err(|error| format!("Failed to parse ack response: {}", error))?;

    Ok(AckResponse {
        success: raw.success.unwrap_or(true),
        acknowledged_offset: raw.acknowledged_offset.unwrap_or(request_context.upto_offset),
    })
}

fn decode_consume_message(
    raw: RawConsumeMessage,
    request_context: &ConsumeRequestContext,
) -> Result<ConsumeMessage, String> {
    let user = raw.user.ok_or_else(|| {
        "Consume response message is missing required user metadata; upgrade the server or republish the topic event with a user id".to_string()
    })?;

    Ok(ConsumeMessage {
        key: raw.key,
        op: raw.op,
        timestamp_ms: raw.timestamp_ms,
        offset: raw.offset,
        partition_id: raw.partition_id.unwrap_or(request_context.partition_id),
        topic: raw.topic_id.unwrap_or_else(|| request_context.topic_id.clone()),
        group_id: request_context.group_id.clone(),
        user,
        payload: decode_payload_value(raw.payload),
    })
}

fn decode_payload_value(payload: Option<serde_json::Value>) -> RowData {
    let value = match payload {
        Some(serde_json::Value::String(payload)) => match STANDARD.decode(payload.as_bytes()) {
            Ok(bytes) => {
                serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::String(payload))
            },
            Err(_) => serde_json::Value::String(payload),
        },
        Some(value @ serde_json::Value::Object(_)) => value,
        Some(other) => other,
        None => serde_json::Value::Null,
    };

    serde_json::from_value(value).unwrap_or_default()
}

#[cfg(test)]
mod tests {
    use super::{
        decode_ack_response, decode_consume_response, AckRequestContext, ConsumeRequestContext,
    };

    #[test]
    fn decode_consume_response_requires_user_metadata() {
        let request_context = ConsumeRequestContext {
            topic_id: "orders".to_string(),
            group_id: "billing".to_string(),
            partition_id: 0,
        };

        let error = decode_consume_response(
            r#"{
                "messages": [
                    {
                        "topic_id": "orders",
                        "partition_id": 0,
                        "offset": 9,
                        "payload": {"id": 9}
                    }
                ],
                "next_offset": 10,
                "has_more": false
            }"#,
            &request_context,
        )
        .unwrap_err();

        assert!(error.contains("missing required user metadata"));
    }

    #[test]
    fn decode_consume_response_accepts_user_id_alias() {
        let request_context = ConsumeRequestContext {
            topic_id: "orders".to_string(),
            group_id: "billing".to_string(),
            partition_id: 0,
        };

        let response = decode_consume_response(
            r#"{
                "messages": [
                    {
                        "topic_id": "orders",
                        "partition_id": 0,
                        "offset": 9,
                        "user_id": "producer-42",
                        "payload": {"id": 9}
                    }
                ],
                "next_offset": 10,
                "has_more": false
            }"#,
            &request_context,
        )
        .expect("consume response with user_id alias should decode");

        assert_eq!(response.messages.len(), 1);
        assert_eq!(response.messages[0].user.as_str(), "producer-42");
    }

    #[test]
    fn decode_ack_response_defaults_missing_offset_from_request() {
        let response =
            decode_ack_response(r#"{"success":true}"#, AckRequestContext { upto_offset: 42 })
                .expect("ack response should decode");

        assert!(response.success);
        assert_eq!(response.acknowledged_offset, 42);
    }
}