rustybook-messenger 0.2.1

Messenger client for Rustybook
Documentation
use serde_json::{
    Value,
    json,
};

use crate::error::MessengerError;
use crate::gateway::lightspeed::codec::encode_request;
use crate::gateway::lightspeed::req::ReqPayload;
use crate::gateway::lightspeed::task::{
    Task,
    TaskLabel,
};
use crate::gateway::lightspeed::topic::Topic;
use crate::state::State;

#[derive(Debug, Clone)]
pub struct BootstrapFrame {
    pub topic: Topic,
    pub payload: Vec<u8>,
    pub request_id: Option<u64>,
}

pub fn build_bootstrap_frames(
    state: &State,
    online: bool,
) -> Result<Vec<BootstrapFrame>, MessengerError> {
    let mut frames = Vec::new();

    let app_settings_payload = serde_json::to_vec(&json!({
        "ls_fdid": "",
        "ls_sv": state.ls_version_id.clone(),
    }))?;
    frames.push(BootstrapFrame {
        topic: Topic::LsAppSettings,
        payload: app_settings_payload,
        request_id: None,
    });

    let foreground_payload = if online { b"1".to_vec() } else { b"0".to_vec() };
    frames.push(BootstrapFrame {
        topic: Topic::LsForegroundState,
        payload: foreground_payload,
        request_id: None,
    });

    // Browser clients publish an empty control frame to /ls_resp during startup.
    frames.push(BootstrapFrame {
        topic: Topic::LsResp,
        payload: Vec::new(),
        request_id: None,
    });

    // Keep request ids and task ids in the same shape as browser captures.
    let mut request_id = 4u64;
    let mut task_id = 0u64;

    // Phase 2: queue bootstrap tasks (thread ranges, contacts, integrity/search/project pulls).
    let thread_range_tasks = build_thread_range_tasks(&mut task_id);
    frames.push(build_type3_frame(
        state,
        &mut request_id,
        rand::random::<u64>(),
        thread_range_tasks,
    )?);

    if let Ok(contact_id) = state.user_id.parse::<u64>() {
        let contact_tasks = vec![build_task(
            TaskLabel::AcceptFriendRequest,
            json!({ "contact_id": contact_id }),
            "cpq_v2".to_string(),
            &mut task_id,
        )];
        frames.push(build_type3_frame(
            state,
            &mut request_id,
            rand::random::<u64>(),
            contact_tasks,
        )?);
    }

    let integrity_tasks = vec![build_task(
        TaskLabel::IntegrityCheck,
        json!({
            "cursor": Value::Null,
            "mi_act_mapping_columns_for_integrity_check": [],
            "occam_threads_columns_for_integrity_check": [],
            "threads_ranges_columns_for_integrity_check": [{
                "has_more_before": 0,
                "is_loading_before": 0,
                "min_last_activity_timestamp_ms": 1,
                "min_thread_key": i64::MIN,
                "parent_thread_key": 0,
            }],
            "tam_threads_columns_for_integrity_check": [],
        }),
        "occam-threads-integrity-check".to_string(),
        &mut task_id,
    )];
    frames.push(build_type3_frame(
        state,
        &mut request_id,
        rand::random::<u64>(),
        integrity_tasks,
    )?);

    let now_ms = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis();
    let search_tasks = vec![build_task(
        TaskLabel::SearchContacts,
        json!({ "limit": 10 }),
        json!(["search_contacts", now_ms.to_string()]).to_string(),
        &mut task_id,
    )];
    frames.push(build_type3_frame(
        state,
        &mut request_id,
        rand::random::<u64>(),
        search_tasks,
    )?);

    // Phase 3: initialize LS databases.
    let type2_sync_databases = [1u64, 95, 104];
    for database in type2_sync_databases {
        frames.push(build_type2_frame(
            state,
            &mut request_id,
            database,
            rand::random::<u64>(),
            None,
            false,
        )?);
    }

    let type1_sync_databases = [2u64, 16, 26, 89, 140, 141, 143, 145, 196, 197, 198, 202, 7];
    for database in type1_sync_databases {
        frames.push(build_type1_frame(
            state,
            &mut request_id,
            database,
            rand::random::<u64>(),
        )?);
    }

    // Phase 4: follow-up pulls for high-churn DBs.
    let follow_up_type2_databases = [2u64, 16];
    for database in follow_up_type2_databases {
        frames.push(build_type2_frame(
            state,
            &mut request_id,
            database,
            rand::random::<u64>(),
            None,
            false,
        )?);
    }

    let contact_refresh_tasks = vec![build_task(
        TaskLabel::AcceptFriendRequest,
        json!({ "contact_id": state.user_id.parse::<u64>().ok() }),
        "cpq_v2".to_string(),
        &mut task_id,
    )];
    frames.push(build_type3_frame(
        state,
        &mut request_id,
        rand::random::<u64>(),
        contact_refresh_tasks,
    )?);

    let mailbox_pull_tasks = vec![build_task(
        TaskLabel::PullMailbox,
        json!({
            "table": "mailbox",
            "type": 1,
            "1": state.user_id.parse::<u64>().ok(),
            "2": now_ms.to_string(),
        }),
        "ppq".to_string(),
        &mut task_id,
    )];
    frames.push(build_type3_frame(
        state,
        &mut request_id,
        rand::random::<u64>(),
        mailbox_pull_tasks,
    )?);

    for tasks in build_project_sync_tasks(&mut task_id) {
        frames.push(build_type3_frame(
            state,
            &mut request_id,
            rand::random::<u64>(),
            tasks,
        )?);
    }

    let database_resync = [16u64, 2, 16];
    for database in database_resync {
        frames.push(build_type2_frame(
            state,
            &mut request_id,
            database,
            rand::random::<u64>(),
            None,
            false,
        )?);
    }

    let read_state_tasks = vec![build_task(
        TaskLabel::MarkRead,
        json!({
            "thread_id": state.user_id.parse::<u64>().ok(),
            "last_read_watermark_ts": now_ms,
            "sync_group": 1,
        }),
        state.user_id.clone(),
        &mut task_id,
    )];
    frames.push(build_type3_frame(
        state,
        &mut request_id,
        rand::random::<u64>(),
        read_state_tasks,
    )?);

    let final_resync = [16u64, 2];
    for database in final_resync {
        frames.push(build_type2_frame(
            state,
            &mut request_id,
            database,
            rand::random::<u64>(),
            None,
            false,
        )?);
    }

    Ok(frames)
}

fn build_task(label: TaskLabel, payload: Value, queue_name: String, task_id: &mut u64) -> Task {
    Task::new(label, payload, queue_name, next_task_id(task_id))
}

fn build_project_sync_tasks(task_id: &mut u64) -> Vec<Vec<Task>> {
    let projects = [
        "LS_MediaReceiverFetch",
        "LS_MsgrXmaReceiverFetch",
        "LS_StickerSearch",
    ];
    projects
        .iter()
        .map(|project_name| {
            vec![build_task(
                TaskLabel::ProjectSync,
                json!({
                    "project_name": project_name,
                }),
                format!("acs_sync_{project_name}"),
                task_id,
            )]
        })
        .collect()
}

fn build_thread_range_tasks(task_id: &mut u64) -> Vec<Task> {
    let mut tasks = Vec::new();
    let variants: [(i64, i64); 5] = [(-12, 1), (-12, 95), (-1, 1), (-1, 95), (-28, 1)];
    for (parent_thread_key, sync_group) in variants {
        tasks.push(build_task(
            TaskLabel::ThreadRangeQuery,
            json!({
                "is_after": 0,
                "parent_thread_key": parent_thread_key,
                "reference_thread_key": 0,
                "reference_activity_timestamp": 9_999_999_999_999u64,
                "additional_pages_to_fetch": 0,
                "cursor": Value::Null,
                "messaging_tag": Value::Null,
                "sync_group": sync_group,
            }),
            "trq".to_string(),
            task_id,
        ));
    }
    tasks
}

fn build_type3_frame(
    state: &State,
    request_id: &mut u64,
    epoch_id: u64,
    tasks: Vec<Task>,
) -> Result<BootstrapFrame, MessengerError> {
    let request_id_value = next_request_id(request_id);
    let req = ReqPayload::new_task_batch(
        state.ls_app_id.clone(),
        request_id_value,
        epoch_id,
        state.ls_version_id.clone(),
        tasks,
    );
    let payload = encode_request(&req)?;
    Ok(BootstrapFrame {
        topic: Topic::LsReq,
        payload,
        request_id: Some(request_id_value),
    })
}

fn build_type2_frame(
    state: &State,
    request_id: &mut u64,
    database: u64,
    epoch: u64,
    last_applied_cursor: Option<String>,
    include_sync_params: bool,
) -> Result<BootstrapFrame, MessengerError> {
    let request_id_value = next_request_id(request_id);
    let version = state.ls_version_id.parse::<u64>().unwrap_or_default();
    let sync_params = if include_sync_params {
        Some(json!({"locale":"en_US"}))
    } else {
        None
    };

    let req = ReqPayload::new_cursor_sync(
        state.ls_app_id.clone(),
        request_id_value,
        database,
        epoch,
        last_applied_cursor,
        sync_params,
        version,
    );
    let payload = encode_request(&req)?;
    Ok(BootstrapFrame {
        topic: Topic::LsReq,
        payload,
        request_id: Some(request_id_value),
    })
}

fn build_type1_frame(
    state: &State,
    request_id: &mut u64,
    database: u64,
    epoch: u64,
) -> Result<BootstrapFrame, MessengerError> {
    let request_id_value = next_request_id(request_id);
    let version = state.ls_version_id.parse::<u64>().unwrap_or_default();
    let req = ReqPayload::new_init_sync(
        state.ls_app_id.clone(),
        request_id_value,
        database,
        epoch,
        version,
    );
    let payload = encode_request(&req)?;
    Ok(BootstrapFrame {
        topic: Topic::LsReq,
        payload,
        request_id: Some(request_id_value),
    })
}

fn next_task_id(task_id: &mut u64) -> u64 {
    let current = *task_id;
    *task_id = task_id.saturating_add(1);
    current
}

fn next_request_id(request_id: &mut u64) -> u64 {
    let current = *request_id;
    *request_id = request_id.wrapping_add(1);
    current
}