rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use serde_json::Value;
use tokio::sync::{
    Mutex,
    oneshot,
};
use tracing::trace;

use crate::error::MessengerError;
use crate::gateway::lightspeed::codec::decode_envelope;
use crate::gateway::lightspeed::req::{
    ReqBody,
    ReqPayload,
    ReqType,
    TaskBatchPayload,
};
use crate::gateway::lightspeed::request_id::RequestId;
use crate::gateway::lightspeed::task::{
    Task,
    TaskLabel,
};
use crate::model::SendReceipt;

#[derive(Debug, Clone)]
pub struct PendingRequests {
    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<LsResponse>>>>,
}

#[derive(Debug)]
pub struct LsResponse {
    pub request_id: u64,
    pub payload: Value,
}

impl PendingRequests {
    pub fn new() -> Self {
        Self {
            pending: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    pub async fn register(&self, request_id: u64) -> oneshot::Receiver<LsResponse> {
        let (tx, rx) = oneshot::channel();
        let mut pending = self.pending.lock().await;
        pending.insert(request_id, tx);
        rx
    }

    pub async fn resolve_payload(&self, payload: &[u8]) -> Result<(), MessengerError> {
        trace!(
            payload_len = payload.len(),
            "resolving lightspeed response payload"
        );
        let envelope = decode_envelope("/ls_resp", payload)?;
        let Some(request_id) = envelope.request_id.as_u64() else {
            trace!("lightspeed response payload has no request_id");
            return Ok(());
        };
        trace!(request_id, "lightspeed response payload has request_id");

        let mut pending = self.pending.lock().await;
        if let Some(tx) = pending.remove(&request_id) {
            let _ = tx.send(LsResponse {
                request_id,
                payload: envelope.raw,
            });
            trace!(request_id, "resolved pending lightspeed request");
        } else {
            trace!(request_id, "no pending lightspeed request to resolve");
        }

        Ok(())
    }
}

pub fn build_send_text_payload(
    request_id: u64,
    task_id: u64,
    thread_id: &str,
    text: &str,
    app_id: &str,
    version_id: &str,
) -> ReqPayload {
    let offline_threading_id = generate_offline_threading_id();
    let task = Task {
        label: TaskLabel::SendMessage,
        label_code: TaskLabel::SendMessage.code().to_string(),
        payload: serde_json::json!({
            "thread_id": thread_id,
            "otid": offline_threading_id,
            "source": 0,
            "send_type": 1,
            "sync_group": 1,
            "text": text,
            "initiating_source": 0,
        }),
        queue_name: thread_id.to_string(),
        task_id: Some(task_id),
        failure_count: None,
        raw: Value::Null,
    };

    let payload = ReqBody::TaskBatch(TaskBatchPayload {
        epoch_id: Some(offline_threading_id),
        version_id: Some(version_id.to_string()),
        tasks: vec![task],
        raw: Value::Null,
    });

    ReqPayload {
        app_id: Some(app_id.to_string()),
        request_id: RequestId::Int(request_id),
        req_type: ReqType::TaskBatch,
        payload,
        raw: Value::Null,
    }
}

pub fn extract_send_receipt(thread_id: &str, payload: Value) -> SendReceipt {
    let message_id = payload
        .get("payload")
        .and_then(|value| value.get("message_id"))
        .and_then(Value::as_str)
        .map(ToString::to_string)
        .or_else(|| {
            payload
                .get("message_id")
                .and_then(Value::as_str)
                .map(ToString::to_string)
        });

    let offline_threading_id = payload
        .get("payload")
        .and_then(|value| value.get("offline_threading_id"))
        .and_then(Value::as_str)
        .map(ToString::to_string)
        .unwrap_or_else(generate_offline_threading_id);

    SendReceipt {
        thread_id: thread_id.to_string(),
        message_id,
        offline_threading_id,
    }
}

fn generate_offline_threading_id() -> String {
    use std::time::{
        SystemTime,
        UNIX_EPOCH,
    };

    let timestamp_ms = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis();

    let random: u128 = rand::random::<u32>() as u128;
    let value = (timestamp_ms << 22) | random;
    value.to_string()
}