rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
use serde_json::Value;

use crate::gateway::lightspeed::request_id::RequestId;
use crate::gateway::lightspeed::task::Task;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReqType {
    InitSync,
    CursorSync,
    TaskBatch,
    DirectOperation,
    Unknown(i64),
}

impl ReqType {
    pub fn parse(value: Option<i64>) -> Self {
        match value {
            Some(1) => Self::InitSync,
            Some(2) => Self::CursorSync,
            Some(3) => Self::TaskBatch,
            Some(4) => Self::DirectOperation,
            Some(other) => Self::Unknown(other),
            None => Self::Unknown(-1),
        }
    }

    pub fn value(&self) -> i64 {
        match self {
            Self::InitSync => 1,
            Self::CursorSync => 2,
            Self::TaskBatch => 3,
            Self::DirectOperation => 4,
            Self::Unknown(value) => *value,
        }
    }

    pub fn name(&self) -> &'static str {
        match self {
            Self::InitSync => "init_sync",
            Self::CursorSync => "cursor_sync",
            Self::TaskBatch => "task_batch",
            Self::DirectOperation => "direct_operation",
            Self::Unknown(_) => "unknown",
        }
    }
}

#[derive(Debug, Clone)]
pub struct InitSyncPayload {
    pub database: Option<u64>,
    pub version: Option<u64>,
    pub raw: Value,
}

#[derive(Debug, Clone)]
pub struct CursorSyncPayload {
    pub database: Option<u64>,
    pub epoch: Option<u64>,
    pub last_applied_cursor: Option<String>,
    pub sync_params: Option<Value>,
    pub version: Option<u64>,
    pub raw: Value,
}

#[derive(Debug, Clone)]
pub struct TaskBatchPayload {
    pub epoch_id: Option<String>,
    pub version_id: Option<String>,
    pub tasks: Vec<Task>,
    pub raw: Value,
}

#[derive(Debug, Clone)]
pub struct DirectOperationPayload {
    pub label: Option<String>,
    pub version: Option<String>,
    pub payload: Value,
    pub raw: Value,
}

#[derive(Debug, Clone)]
pub struct ReqPayload {
    pub app_id: Option<String>,
    pub request_id: RequestId,
    pub req_type: ReqType,
    pub payload: ReqBody,
    pub raw: Value,
}

#[derive(Debug, Clone)]
pub enum ReqBody {
    // MQTT LS request type=1 (DB initialization pull)
    InitSync(InitSyncPayload),
    // MQTT LS request type=2 (cursor-based DB sync pull)
    CursorSync(CursorSyncPayload),
    // MQTT LS request type=3 (task-batch operation)
    TaskBatch(TaskBatchPayload),
    // MQTT LS request type=4 (single-op request, e.g. typing)
    DirectOperation(DirectOperationPayload),
    Raw(Value),
}

impl ReqPayload {
    pub fn new_task_batch(
        app_id: impl Into<String>,
        request_id: u64,
        epoch_id: u64,
        version_id: impl Into<String>,
        tasks: Vec<Task>,
    ) -> Self {
        Self {
            app_id: Some(app_id.into()),
            request_id: RequestId::Int(request_id),
            req_type: ReqType::TaskBatch,
            payload: ReqBody::TaskBatch(TaskBatchPayload {
                epoch_id: Some(epoch_id.to_string()),
                version_id: Some(version_id.into()),
                tasks,
                raw: Value::Null,
            }),
            raw: Value::Null,
        }
    }

    pub fn new_cursor_sync(
        app_id: impl Into<String>,
        request_id: u64,
        database: u64,
        epoch: u64,
        last_applied_cursor: Option<String>,
        sync_params: Option<Value>,
        version: u64,
    ) -> Self {
        Self {
            app_id: Some(app_id.into()),
            request_id: RequestId::Int(request_id),
            req_type: ReqType::CursorSync,
            payload: ReqBody::CursorSync(CursorSyncPayload {
                database: Some(database),
                epoch: Some(epoch),
                last_applied_cursor: last_applied_cursor.clone(),
                sync_params: sync_params.clone(),
                version: Some(version),
                raw: serde_json::json!({
                    "database": database,
                    "epoch": epoch,
                    "failure_count": Value::Null,
                    "last_applied_cursor": last_applied_cursor,
                    "sync_params": sync_params,
                    "version": version,
                }),
            }),
            raw: Value::Null,
        }
    }

    pub fn new_init_sync(
        app_id: impl Into<String>,
        request_id: u64,
        database: u64,
        epoch: u64,
        version: u64,
    ) -> Self {
        Self {
            app_id: Some(app_id.into()),
            request_id: RequestId::Int(request_id),
            req_type: ReqType::InitSync,
            payload: ReqBody::InitSync(InitSyncPayload {
                database: Some(database),
                version: Some(version),
                raw: serde_json::json!({
                    "database": database,
                    "epoch": epoch,
                    "failure_count": Value::Null,
                    "last_applied_cursor": Value::Null,
                    "sync_params": serde_json::json!({"locale":"en_US"}).to_string(),
                    "version": version,
                }),
            }),
            raw: Value::Null,
        }
    }

    pub fn parse(root: Value) -> Self {
        let app_id = root
            .get("app_id")
            .and_then(Value::as_str)
            .map(ToString::to_string);
        let request_id = RequestId::parse(root.get("request_id").or_else(|| root.get("requestId")));
        let req_type = ReqType::parse(root.get("type").and_then(Value::as_i64));
        let payload_value = decode_payload_field(root.get("payload"));

        let payload = match req_type {
            ReqType::InitSync => ReqBody::InitSync(InitSyncPayload {
                database: payload_value.get("database").and_then(value_to_u64),
                version: payload_value.get("version").and_then(value_to_u64),
                raw: payload_value.clone(),
            }),
            ReqType::CursorSync => ReqBody::CursorSync(CursorSyncPayload {
                database: payload_value.get("database").and_then(value_to_u64),
                epoch: payload_value.get("epoch").and_then(value_to_u64),
                last_applied_cursor: payload_value
                    .get("last_applied_cursor")
                    .and_then(Value::as_str)
                    .map(ToString::to_string),
                sync_params: payload_value.get("sync_params").cloned(),
                version: payload_value.get("version").and_then(value_to_u64),
                raw: payload_value.clone(),
            }),
            ReqType::TaskBatch => {
                let tasks = payload_value
                    .get("tasks")
                    .and_then(Value::as_array)
                    .map(|items| items.iter().filter_map(Task::parse).collect())
                    .unwrap_or_default();
                ReqBody::TaskBatch(TaskBatchPayload {
                    epoch_id: payload_value.get("epoch_id").and_then(value_to_string),
                    version_id: payload_value.get("version_id").and_then(value_to_string),
                    tasks,
                    raw: payload_value.clone(),
                })
            }
            ReqType::DirectOperation => ReqBody::DirectOperation(DirectOperationPayload {
                label: payload_value.get("label").and_then(value_to_string),
                version: payload_value.get("version").and_then(value_to_string),
                payload: decode_payload_field(payload_value.get("payload")),
                raw: payload_value.clone(),
            }),
            ReqType::Unknown(_) => ReqBody::Raw(payload_value.clone()),
        };

        Self {
            app_id,
            request_id,
            req_type,
            payload,
            raw: root,
        }
    }

    pub fn to_json(&self) -> Value {
        let payload = match &self.payload {
            ReqBody::InitSync(value) => value.raw.clone(),
            ReqBody::CursorSync(value) => value.raw.clone(),
            ReqBody::TaskBatch(value) => {
                let tasks: Vec<Value> = value.tasks.iter().map(Task::encode_value).collect();
                serde_json::json!({
                    "epoch_id": value.epoch_id,
                    "version_id": value.version_id,
                    "tasks": tasks,
                })
            }
            ReqBody::DirectOperation(value) => serde_json::json!({
                "label": value.label,
                "version": value.version,
                "payload": if value.payload.is_string() {
                    value.payload.clone()
                } else {
                    Value::String(value.payload.to_string())
                }
            }),
            ReqBody::Raw(value) => value.clone(),
        };

        serde_json::json!({
            "app_id": self.app_id,
            "request_id": self.request_id.value_json(),
            "type": self.req_type.value(),
            "payload": payload.to_string(),
        })
    }
}

fn decode_payload_field(value: Option<&Value>) -> Value {
    match value {
        Some(Value::String(text)) => {
            serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
        }
        Some(other) => other.clone(),
        None => Value::Null,
    }
}

fn value_to_string(value: &Value) -> Option<String> {
    value
        .as_str()
        .map(ToString::to_string)
        .or_else(|| value.as_u64().map(|number| number.to_string()))
        .or_else(|| value.as_i64().map(|number| number.to_string()))
}

fn value_to_u64(value: &Value) -> Option<u64> {
    value
        .as_u64()
        .or_else(|| value.as_i64().and_then(|number| u64::try_from(number).ok()))
        .or_else(|| value.as_str().and_then(|text| text.parse::<u64>().ok()))
}