use serde_json::Value;
use crate::gateway::lightspeed::request_id::RequestId;
use crate::gateway::lightspeed::task::Task;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReqType {
InitSync,
CursorSync,
TaskBatch,
DirectOperation,
Unknown(i64),
}
impl ReqType {
pub fn parse(value: Option<i64>) -> Self {
match value {
Some(1) => Self::InitSync,
Some(2) => Self::CursorSync,
Some(3) => Self::TaskBatch,
Some(4) => Self::DirectOperation,
Some(other) => Self::Unknown(other),
None => Self::Unknown(-1),
}
}
pub fn value(&self) -> i64 {
match self {
Self::InitSync => 1,
Self::CursorSync => 2,
Self::TaskBatch => 3,
Self::DirectOperation => 4,
Self::Unknown(value) => *value,
}
}
pub fn name(&self) -> &'static str {
match self {
Self::InitSync => "init_sync",
Self::CursorSync => "cursor_sync",
Self::TaskBatch => "task_batch",
Self::DirectOperation => "direct_operation",
Self::Unknown(_) => "unknown",
}
}
}
#[derive(Debug, Clone)]
pub struct InitSyncPayload {
pub database: Option<u64>,
pub version: Option<u64>,
pub raw: Value,
}
#[derive(Debug, Clone)]
pub struct CursorSyncPayload {
pub database: Option<u64>,
pub epoch: Option<u64>,
pub last_applied_cursor: Option<String>,
pub sync_params: Option<Value>,
pub version: Option<u64>,
pub raw: Value,
}
#[derive(Debug, Clone)]
pub struct TaskBatchPayload {
pub epoch_id: Option<String>,
pub version_id: Option<String>,
pub tasks: Vec<Task>,
pub raw: Value,
}
#[derive(Debug, Clone)]
pub struct DirectOperationPayload {
pub label: Option<String>,
pub version: Option<String>,
pub payload: Value,
pub raw: Value,
}
#[derive(Debug, Clone)]
pub struct ReqPayload {
pub app_id: Option<String>,
pub request_id: RequestId,
pub req_type: ReqType,
pub payload: ReqBody,
pub raw: Value,
}
#[derive(Debug, Clone)]
pub enum ReqBody {
InitSync(InitSyncPayload),
CursorSync(CursorSyncPayload),
TaskBatch(TaskBatchPayload),
DirectOperation(DirectOperationPayload),
Raw(Value),
}
impl ReqPayload {
pub fn new_task_batch(
app_id: impl Into<String>,
request_id: u64,
epoch_id: u64,
version_id: impl Into<String>,
tasks: Vec<Task>,
) -> Self {
Self {
app_id: Some(app_id.into()),
request_id: RequestId::Int(request_id),
req_type: ReqType::TaskBatch,
payload: ReqBody::TaskBatch(TaskBatchPayload {
epoch_id: Some(epoch_id.to_string()),
version_id: Some(version_id.into()),
tasks,
raw: Value::Null,
}),
raw: Value::Null,
}
}
pub fn new_cursor_sync(
app_id: impl Into<String>,
request_id: u64,
database: u64,
epoch: u64,
last_applied_cursor: Option<String>,
sync_params: Option<Value>,
version: u64,
) -> Self {
Self {
app_id: Some(app_id.into()),
request_id: RequestId::Int(request_id),
req_type: ReqType::CursorSync,
payload: ReqBody::CursorSync(CursorSyncPayload {
database: Some(database),
epoch: Some(epoch),
last_applied_cursor: last_applied_cursor.clone(),
sync_params: sync_params.clone(),
version: Some(version),
raw: serde_json::json!({
"database": database,
"epoch": epoch,
"failure_count": Value::Null,
"last_applied_cursor": last_applied_cursor,
"sync_params": sync_params,
"version": version,
}),
}),
raw: Value::Null,
}
}
pub fn new_init_sync(
app_id: impl Into<String>,
request_id: u64,
database: u64,
epoch: u64,
version: u64,
) -> Self {
Self {
app_id: Some(app_id.into()),
request_id: RequestId::Int(request_id),
req_type: ReqType::InitSync,
payload: ReqBody::InitSync(InitSyncPayload {
database: Some(database),
version: Some(version),
raw: serde_json::json!({
"database": database,
"epoch": epoch,
"failure_count": Value::Null,
"last_applied_cursor": Value::Null,
"sync_params": serde_json::json!({"locale":"en_US"}).to_string(),
"version": version,
}),
}),
raw: Value::Null,
}
}
pub fn parse(root: Value) -> Self {
let app_id = root
.get("app_id")
.and_then(Value::as_str)
.map(ToString::to_string);
let request_id = RequestId::parse(root.get("request_id").or_else(|| root.get("requestId")));
let req_type = ReqType::parse(root.get("type").and_then(Value::as_i64));
let payload_value = decode_payload_field(root.get("payload"));
let payload = match req_type {
ReqType::InitSync => ReqBody::InitSync(InitSyncPayload {
database: payload_value.get("database").and_then(value_to_u64),
version: payload_value.get("version").and_then(value_to_u64),
raw: payload_value.clone(),
}),
ReqType::CursorSync => ReqBody::CursorSync(CursorSyncPayload {
database: payload_value.get("database").and_then(value_to_u64),
epoch: payload_value.get("epoch").and_then(value_to_u64),
last_applied_cursor: payload_value
.get("last_applied_cursor")
.and_then(Value::as_str)
.map(ToString::to_string),
sync_params: payload_value.get("sync_params").cloned(),
version: payload_value.get("version").and_then(value_to_u64),
raw: payload_value.clone(),
}),
ReqType::TaskBatch => {
let tasks = payload_value
.get("tasks")
.and_then(Value::as_array)
.map(|items| items.iter().filter_map(Task::parse).collect())
.unwrap_or_default();
ReqBody::TaskBatch(TaskBatchPayload {
epoch_id: payload_value.get("epoch_id").and_then(value_to_string),
version_id: payload_value.get("version_id").and_then(value_to_string),
tasks,
raw: payload_value.clone(),
})
}
ReqType::DirectOperation => ReqBody::DirectOperation(DirectOperationPayload {
label: payload_value.get("label").and_then(value_to_string),
version: payload_value.get("version").and_then(value_to_string),
payload: decode_payload_field(payload_value.get("payload")),
raw: payload_value.clone(),
}),
ReqType::Unknown(_) => ReqBody::Raw(payload_value.clone()),
};
Self {
app_id,
request_id,
req_type,
payload,
raw: root,
}
}
pub fn to_json(&self) -> Value {
let payload = match &self.payload {
ReqBody::InitSync(value) => value.raw.clone(),
ReqBody::CursorSync(value) => value.raw.clone(),
ReqBody::TaskBatch(value) => {
let tasks: Vec<Value> = value.tasks.iter().map(Task::encode_value).collect();
serde_json::json!({
"epoch_id": value.epoch_id,
"version_id": value.version_id,
"tasks": tasks,
})
}
ReqBody::DirectOperation(value) => serde_json::json!({
"label": value.label,
"version": value.version,
"payload": if value.payload.is_string() {
value.payload.clone()
} else {
Value::String(value.payload.to_string())
}
}),
ReqBody::Raw(value) => value.clone(),
};
serde_json::json!({
"app_id": self.app_id,
"request_id": self.request_id.value_json(),
"type": self.req_type.value(),
"payload": payload.to_string(),
})
}
}
fn decode_payload_field(value: Option<&Value>) -> Value {
match value {
Some(Value::String(text)) => {
serde_json::from_str::<Value>(text).unwrap_or_else(|_| Value::String(text.clone()))
}
Some(other) => other.clone(),
None => Value::Null,
}
}
fn value_to_string(value: &Value) -> Option<String> {
value
.as_str()
.map(ToString::to_string)
.or_else(|| value.as_u64().map(|number| number.to_string()))
.or_else(|| value.as_i64().map(|number| number.to_string()))
}
fn value_to_u64(value: &Value) -> Option<u64> {
value
.as_u64()
.or_else(|| value.as_i64().and_then(|number| u64::try_from(number).ok()))
.or_else(|| value.as_str().and_then(|text| text.parse::<u64>().ok()))
}