use serde::{Deserialize, Serialize};
pub const POLLERS_LIST_METHOD: &str = "nexo/admin/pollers/list";
pub const POLLERS_GET_METHOD: &str = "nexo/admin/pollers/get";
pub const POLLERS_UPSERT_METHOD: &str = "nexo/admin/pollers/upsert";
pub const POLLERS_DELETE_METHOD: &str = "nexo/admin/pollers/delete";
pub const POLLERS_PAUSE_METHOD: &str = "nexo/admin/pollers/pause";
pub const POLLERS_RESUME_METHOD: &str = "nexo/admin/pollers/resume";
pub const POLLERS_RUN_NOW_METHOD: &str = "nexo/admin/pollers/run_now";
pub const POLLER_ID_REGEX: &str = r"^[a-z][a-z0-9_]{1,63}$";
pub const FAILURE_RECIPIENT_MAX_LEN: usize = 256;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum PollerSchedule {
Every {
every_secs: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
stagger_jitter_ms: Option<u64>,
},
Cron {
cron: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
tz: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
stagger_jitter_ms: Option<u64>,
},
At {
at: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DeliveryTargetWire {
pub channel: String,
pub to: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollerEntry {
pub id: String,
pub kind: String,
pub agent: String,
pub schedule: PollerSchedule,
#[serde(default)]
pub config: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub failure_to: Option<DeliveryTargetWire>,
#[serde(default)]
pub paused_on_boot: bool,
pub paused: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_run_at_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub next_run_at_ms: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_status: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
pub consecutive_errors: i64,
pub items_seen_total: i64,
pub items_dispatched_total: i64,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct PollersListFilter {
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct PollersListResponse {
pub pollers: Vec<PollerEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollersGetParams {
pub id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollersUpsertInput {
pub id: String,
pub kind: String,
pub agent: String,
pub schedule: PollerSchedule,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub failure_to: Option<DeliveryTargetWire>,
#[serde(default)]
pub paused_on_boot: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollersUpsertResponse {
pub entry: PollerEntry,
pub created: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollersDeleteParams {
pub id: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct PollersDeleteResponse {
pub removed: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollersRuntimeParams {
pub id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PollersRuntimeResponse {
pub id: String,
pub applied: bool,
pub new_state: String,
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn method_literals_match_expected_jsonrpc_paths() {
assert_eq!(POLLERS_LIST_METHOD, "nexo/admin/pollers/list");
assert_eq!(POLLERS_GET_METHOD, "nexo/admin/pollers/get");
assert_eq!(POLLERS_UPSERT_METHOD, "nexo/admin/pollers/upsert");
assert_eq!(POLLERS_DELETE_METHOD, "nexo/admin/pollers/delete");
assert_eq!(POLLERS_PAUSE_METHOD, "nexo/admin/pollers/pause");
assert_eq!(POLLERS_RESUME_METHOD, "nexo/admin/pollers/resume");
assert_eq!(POLLERS_RUN_NOW_METHOD, "nexo/admin/pollers/run_now");
}
#[test]
fn id_regex_pattern_is_locked() {
assert_eq!(POLLER_ID_REGEX, r"^[a-z][a-z0-9_]{1,63}$");
}
#[test]
fn schedule_every_round_trips() {
let s = PollerSchedule::Every {
every_secs: 60,
stagger_jitter_ms: Some(2_000),
};
let v = serde_json::to_value(&s).unwrap();
assert_eq!(v["every_secs"], 60);
assert_eq!(v["stagger_jitter_ms"], 2_000);
assert!(v.get("cron").is_none(), "cron field absent on Every");
let back: PollerSchedule = serde_json::from_value(v).unwrap();
assert_eq!(s, back);
}
#[test]
fn schedule_every_omits_jitter_when_none() {
let s = PollerSchedule::Every {
every_secs: 30,
stagger_jitter_ms: None,
};
let txt = serde_json::to_string(&s).unwrap();
assert!(!txt.contains("stagger_jitter_ms"));
let back: PollerSchedule = serde_json::from_str(&txt).unwrap();
assert_eq!(s, back);
}
#[test]
fn schedule_cron_round_trips_with_tz() {
let s = PollerSchedule::Cron {
cron: "0 0 8 * * *".into(),
tz: Some("America/Bogota".into()),
stagger_jitter_ms: None,
};
let v = serde_json::to_value(&s).unwrap();
assert_eq!(v["cron"], "0 0 8 * * *");
assert_eq!(v["tz"], "America/Bogota");
let back: PollerSchedule = serde_json::from_value(v).unwrap();
assert_eq!(s, back);
}
#[test]
fn schedule_at_round_trips() {
let s = PollerSchedule::At {
at: "2026-12-31T23:59:59Z".into(),
};
let v = serde_json::to_value(&s).unwrap();
assert_eq!(v["at"], "2026-12-31T23:59:59Z");
let back: PollerSchedule = serde_json::from_value(v).unwrap();
assert_eq!(s, back);
}
#[test]
fn entry_round_trips_with_runtime_fields() {
let e = PollerEntry {
id: "ana_email_leads".into(),
kind: "gmail".into(),
agent: "ana".into(),
schedule: PollerSchedule::Every {
every_secs: 600,
stagger_jitter_ms: None,
},
config: json!({ "query": "is:unread" }),
failure_to: Some(DeliveryTargetWire {
channel: "telegram".into(),
to: "1194292426".into(),
}),
paused_on_boot: false,
paused: false,
last_run_at_ms: Some(1_700_000_000_000),
next_run_at_ms: Some(1_700_000_600_000),
last_status: Some("ok".into()),
last_error: None,
consecutive_errors: 0,
items_seen_total: 42,
items_dispatched_total: 7,
};
let v = serde_json::to_value(&e).unwrap();
assert!(v.get("last_error").is_none());
let back: PollerEntry = serde_json::from_value(v).unwrap();
assert_eq!(e, back);
}
#[test]
fn upsert_input_round_trip_with_minimal_config() {
let i = PollersUpsertInput {
id: "etb_lead_router".into(),
kind: "agent_turn".into(),
agent: "etb_lead_router".into(),
schedule: PollerSchedule::Every {
every_secs: 300,
stagger_jitter_ms: None,
},
config: None,
failure_to: None,
paused_on_boot: true,
};
let v = serde_json::to_value(&i).unwrap();
let obj = v.as_object().unwrap();
assert!(!obj.contains_key("config"));
assert!(!obj.contains_key("failure_to"));
assert_eq!(v["paused_on_boot"], true);
let back: PollersUpsertInput = serde_json::from_value(v).unwrap();
assert_eq!(i, back);
}
#[test]
fn list_filter_defaults_skip_on_wire() {
let f = PollersListFilter::default();
let v = serde_json::to_value(&f).unwrap();
let obj = v.as_object().unwrap();
assert!(obj.is_empty(), "default filter serializes to {{}}");
let f2 = PollersListFilter {
agent_id: Some("ana".into()),
kind: None,
};
let v2 = serde_json::to_value(&f2).unwrap();
assert_eq!(v2["agent_id"], "ana");
assert!(v2.get("kind").is_none());
}
#[test]
fn runtime_response_carries_state_label() {
let r = PollersRuntimeResponse {
id: "ana_email_leads".into(),
applied: true,
new_state: "paused".into(),
};
let v = serde_json::to_value(&r).unwrap();
assert_eq!(v["new_state"], "paused");
let back: PollersRuntimeResponse = serde_json::from_value(v).unwrap();
assert_eq!(r, back);
}
}