use std::collections::HashMap;
use std::sync::Arc;
use serde_json::Value;
use tokio::sync::{
Mutex,
oneshot,
};
use tracing::trace;
use crate::error::MessengerError;
use crate::gateway::lightspeed::codec::decode_envelope;
use crate::gateway::lightspeed::req::{
ReqBody,
ReqPayload,
ReqType,
TaskBatchPayload,
};
use crate::gateway::lightspeed::request_id::RequestId;
use crate::gateway::lightspeed::task::{
Task,
TaskLabel,
};
use crate::model::SendReceipt;
#[derive(Debug, Clone)]
pub struct PendingRequests {
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<LsResponse>>>>,
}
#[derive(Debug)]
pub struct LsResponse {
pub request_id: u64,
pub payload: Value,
}
impl PendingRequests {
pub fn new() -> Self {
Self {
pending: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn register(&self, request_id: u64) -> oneshot::Receiver<LsResponse> {
let (tx, rx) = oneshot::channel();
let mut pending = self.pending.lock().await;
pending.insert(request_id, tx);
rx
}
pub async fn resolve_payload(&self, payload: &[u8]) -> Result<(), MessengerError> {
trace!(
payload_len = payload.len(),
"resolving lightspeed response payload"
);
let envelope = decode_envelope("/ls_resp", payload)?;
let Some(request_id) = envelope.request_id.as_u64() else {
trace!("lightspeed response payload has no request_id");
return Ok(());
};
trace!(request_id, "lightspeed response payload has request_id");
let mut pending = self.pending.lock().await;
if let Some(tx) = pending.remove(&request_id) {
let _ = tx.send(LsResponse {
request_id,
payload: envelope.raw,
});
trace!(request_id, "resolved pending lightspeed request");
} else {
trace!(request_id, "no pending lightspeed request to resolve");
}
Ok(())
}
}
pub fn build_send_text_payload(
request_id: u64,
task_id: u64,
thread_id: &str,
text: &str,
app_id: &str,
version_id: &str,
) -> ReqPayload {
let offline_threading_id = generate_offline_threading_id();
let task = Task {
label: TaskLabel::SendMessage,
label_code: TaskLabel::SendMessage.code().to_string(),
payload: serde_json::json!({
"thread_id": thread_id,
"otid": offline_threading_id,
"source": 0,
"send_type": 1,
"sync_group": 1,
"text": text,
"initiating_source": 0,
}),
queue_name: thread_id.to_string(),
task_id: Some(task_id),
failure_count: None,
raw: Value::Null,
};
let payload = ReqBody::TaskBatch(TaskBatchPayload {
epoch_id: Some(offline_threading_id),
version_id: Some(version_id.to_string()),
tasks: vec![task],
raw: Value::Null,
});
ReqPayload {
app_id: Some(app_id.to_string()),
request_id: RequestId::Int(request_id),
req_type: ReqType::TaskBatch,
payload,
raw: Value::Null,
}
}
pub fn extract_send_receipt(thread_id: &str, payload: Value) -> SendReceipt {
let message_id = payload
.get("payload")
.and_then(|value| value.get("message_id"))
.and_then(Value::as_str)
.map(ToString::to_string)
.or_else(|| {
payload
.get("message_id")
.and_then(Value::as_str)
.map(ToString::to_string)
});
let offline_threading_id = payload
.get("payload")
.and_then(|value| value.get("offline_threading_id"))
.and_then(Value::as_str)
.map(ToString::to_string)
.unwrap_or_else(generate_offline_threading_id);
SendReceipt {
thread_id: thread_id.to_string(),
message_id,
offline_threading_id,
}
}
fn generate_offline_threading_id() -> String {
use std::time::{
SystemTime,
UNIX_EPOCH,
};
let timestamp_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let random: u128 = rand::random::<u32>() as u128;
let value = (timestamp_ms << 22) | random;
value.to_string()
}