rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
use std::collections::HashSet;

use serde_json::Value;
use tracing::trace;

use super::legacy::{
    parse_message_delta,
    value_as_i64,
    value_as_string,
};
use crate::error::MessengerError;
use crate::gateway::events::{
    Event,
    MessageEvent,
    TypingEvent,
};
use crate::gateway::lightspeed::codec::decode_envelope;
use crate::gateway::lightspeed::op::RespOp;
use crate::gateway::lightspeed::payload::Payload;

pub(super) fn parse_lightspeed(payload: &[u8]) -> Result<Vec<Event>, MessengerError> {
    trace!(payload_len = payload.len(), "parsing lightspeed payload");
    let envelope = decode_envelope("/ls_resp", payload)?;
    let mut output = Vec::new();

    trace!(
        request_id_type = envelope.request_id.name(),
        request_id_int = envelope.request_id.as_u64(),
        target = envelope.target,
        sp_count = envelope.sp.len(),
        "parsed lightspeed envelope"
    );

    if let Payload::Response(response) = &envelope.payload {
        trace!(
            name = response.name.as_deref().unwrap_or(""),
            operation_count = response.operations.len(),
            "parsed lightspeed response payload"
        );
        for operation in &response.operations {
            match operation {
                RespOp::UpdateTypingIndicator {
                    thread_id,
                    user_id,
                    is_typing,
                } => output.push(Event::Typing(TypingEvent {
                    user_id: user_id.clone(),
                    thread_id: Some(thread_id.clone()),
                    is_typing: *is_typing,
                })),
                RespOp::InsertMessage {
                    thread_id,
                    sender_id,
                    message_id,
                    text,
                    timestamp_ms,
                } => output.push(Event::Message(MessageEvent {
                    message_id: Some(message_id.clone()),
                    thread_id: thread_id.clone(),
                    sender_id: sender_id.clone(),
                    text: text.clone(),
                    timestamp_ms: *timestamp_ms,
                })),
                _ => {}
            }
        }
    }

    let mut messages = Vec::new();
    collect_lightspeed_messages(&envelope.raw, 0, &mut messages);
    trace!(
        candidate_messages = messages.len(),
        "collected lightspeed message candidates"
    );

    let mut dedup = HashSet::new();

    for message in messages {
        let key = format!(
            "{}|{}|{}|{}|{}",
            message.thread_id,
            message.sender_id,
            message.message_id.as_deref().unwrap_or(""),
            message.timestamp_ms.unwrap_or_default(),
            message.text.as_deref().unwrap_or("")
        );

        if dedup.insert(key) {
            output.push(Event::Message(message));
        }
    }

    trace!(events = output.len(), "parsed lightspeed events");
    Ok(output)
}

fn collect_lightspeed_messages(value: &Value, depth: usize, messages: &mut Vec<MessageEvent>) {
    if depth > 10 {
        return;
    }

    if let Some(message) = parse_message_delta(value) {
        messages.push(message);
    } else if let Some(message) = parse_lightspeed_message(value) {
        messages.push(message);
    }

    match value {
        Value::Object(map) => {
            for child in map.values() {
                collect_lightspeed_messages(child, depth + 1, messages);
            }
        }
        Value::Array(list) => {
            for child in list {
                collect_lightspeed_messages(child, depth + 1, messages);
            }
        }
        Value::String(raw) => {
            if let Some(parsed) = parse_string_json(raw) {
                collect_lightspeed_messages(&parsed, depth + 1, messages);
            }
        }
        _ => {}
    }
}

fn parse_lightspeed_message(value: &Value) -> Option<MessageEvent> {
    let thread_id = value
        .get("thread_id")
        .or_else(|| value.get("thread"))
        .or_else(|| value.get("thread_fbid"))
        .and_then(value_as_string)?;

    let sender_id = value
        .get("sender_id")
        .or_else(|| value.get("actor_id"))
        .or_else(|| value.get("author"))
        .or_else(|| value.get("from"))
        .or_else(|| value.get("sender_fbid"))
        .and_then(value_as_string)
        .unwrap_or_default();

    let message_id = value
        .get("message_id")
        .or_else(|| value.get("messageId"))
        .or_else(|| value.get("mid"))
        .or_else(|| value.get("offline_threading_id"))
        .and_then(value_as_string);

    let text = value
        .get("text")
        .or_else(|| value.get("body"))
        .or_else(|| value.get("snippet"))
        .and_then(value_as_string);

    let timestamp_ms = value
        .get("timestamp_ms")
        .or_else(|| value.get("timestamp"))
        .or_else(|| value.get("message_timestamp"))
        .and_then(value_as_i64);

    if sender_id.is_empty() && message_id.is_none() && text.is_none() {
        return None;
    }

    Some(MessageEvent {
        message_id,
        thread_id,
        sender_id,
        text,
        timestamp_ms,
    })
}

fn parse_string_json(raw: &str) -> Option<Value> {
    let trimmed = raw.trim();
    if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
        return None;
    }

    serde_json::from_str::<Value>(trimmed).ok()
}