use std::collections::HashMap;
use async_nats::jetstream::consumer::pull::Config as PullConfig;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use async_nats::jetstream::kv::Operation;
use chrono::{DateTime, Utc};
use futures::StreamExt;
use kanade_shared::ipc::envelope::RpcNotification;
use kanade_shared::ipc::error::{ErrorKind, RpcError};
use kanade_shared::ipc::method;
use kanade_shared::ipc::notifications::{
Notification, NotificationAcked, NotificationNewParams, NotificationsAckParams,
NotificationsAckResult, NotificationsFilter, NotificationsListParams, NotificationsListResult,
NotificationsSubscribeParams, NotificationsSubscribeResult, NotificationsUnsubscribeParams,
};
use kanade_shared::kv::{
BUCKET_AGENT_GROUPS, BUCKET_NOTIFICATIONS_READ, STREAM_NOTIFICATIONS, notifications_read_key,
notifications_read_prefix,
};
use kanade_shared::subject;
use serde::Deserialize;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
use super::super::connection::ConnectionState;
use super::super::notify_bus::filter_subjects;
use super::system::HandlerResult;
use crate::groups::parse_groups;
const MAX_REPLAY: usize = 5000;
const REPLAY_BATCH: usize = 500;
const MAX_LIMIT: usize = 200;
pub fn handle_notifications_subscribe(
conn: &mut ConnectionState,
_params: NotificationsSubscribeParams,
) -> HandlerResult<NotificationsSubscribeResult> {
let rx = conn.notif_subscribe().ok_or_else(|| {
RpcError::new(
ErrorKind::InternalError,
"notification bus not available on this agent build",
)
})?;
let push_tx = conn.push_tx.clone();
let pc_id = conn.pc_id.clone();
let handle = tokio::spawn(forward_notifications(rx, push_tx, pc_id));
let id = conn.subscriptions.register("n", handle);
Ok(NotificationsSubscribeResult { subscription: id })
}
pub fn handle_notifications_unsubscribe(
conn: &mut ConnectionState,
params: NotificationsUnsubscribeParams,
) -> HandlerResult<()> {
if conn.subscriptions.unsubscribe(¶ms.subscription) {
Ok(())
} else {
Err(RpcError::new(
ErrorKind::NotFound,
format!("subscription '{}' not found", params.subscription),
))
}
}
pub async fn handle_notifications_ack(
conn: &ConnectionState,
params: NotificationsAckParams,
) -> HandlerResult<NotificationsAckResult> {
let user_sid = conn.peer.user_sid.as_str();
if user_sid.is_empty() || user_sid == "<unknown>" {
return Err(RpcError::new(
ErrorKind::Unauthorized,
"caller SID could not be resolved; cannot record ack",
));
}
let notif_id = params.id.trim();
if !valid_notification_id(notif_id) {
return Err(RpcError::new(
ErrorKind::InvalidParams,
"notification id must be non-empty and contain only [A-Za-z0-9_.-]",
));
}
let client = conn.nats.as_ref().ok_or_else(|| {
RpcError::new(
ErrorKind::InternalError,
"NATS client not available on this agent build",
)
})?;
let pc_id = conn.pc_id.as_str();
let acked_at = Utc::now();
let js = async_nats::jetstream::new(client.clone());
let kv = js
.get_key_value(BUCKET_NOTIFICATIONS_READ)
.await
.map_err(|e| {
RpcError::new(
ErrorKind::InternalError,
format!("open {BUCKET_NOTIFICATIONS_READ} KV: {e}"),
)
})?;
let key = notifications_read_key(pc_id, user_sid, notif_id);
let value = serde_json::to_vec(&serde_json::json!({
"acked_at": acked_at,
"acked_by": user_sid,
}))
.map_err(|e| RpcError::new(ErrorKind::InternalError, e.to_string()))?;
kv.put(key, value.into()).await.map_err(|e| {
RpcError::new(
ErrorKind::InternalError,
format!("write {BUCKET_NOTIFICATIONS_READ}: {e}"),
)
})?;
let event = NotificationAcked {
notification_id: notif_id.to_string(),
pc_id: pc_id.to_string(),
user_sid: user_sid.to_string(),
acked_at,
};
let payload = serde_json::to_vec(&event)
.map_err(|e| RpcError::new(ErrorKind::InternalError, e.to_string()))?;
let subj = subject::events_notifications_acked(pc_id, user_sid, notif_id);
let ack = js
.publish(subj.clone(), payload.into())
.await
.map_err(|e| RpcError::new(ErrorKind::InternalError, format!("publish {subj}: {e}")))?;
ack.await.map_err(|e| {
RpcError::new(
ErrorKind::InternalError,
format!("ack publish to {subj} not confirmed: {e}"),
)
})?;
info!(
pc_id = %pc_id,
user_sid = %user_sid,
notification_id = %notif_id,
"notification acked",
);
Ok(NotificationsAckResult { acked_at })
}
pub async fn handle_notifications_list(
conn: &ConnectionState,
params: NotificationsListParams,
) -> HandlerResult<NotificationsListResult> {
let client = conn.nats.as_ref().ok_or_else(|| {
RpcError::new(
ErrorKind::InternalError,
"notifications.list: NATS client not wired into the connection",
)
})?;
let user_sid = conn.peer.user_sid.as_str();
if user_sid.is_empty() || user_sid == "<unknown>" {
return Err(RpcError::new(
ErrorKind::Unauthorized,
"notifications.list: caller SID could not be resolved",
));
}
let js = async_nats::jetstream::new(client.clone());
let my_groups = read_my_groups(&js, &conn.pc_id).await?;
let subjects = filter_subjects(&conn.pc_id, &my_groups);
let notifications = replay_notifications(&js, subjects).await?;
let acks = read_user_acks(&js, &conn.pc_id, user_sid).await?;
let limit = (params.limit as usize).clamp(1, MAX_LIMIT);
let offset = decode_cursor(params.cursor.as_deref());
Ok(build_notifications_list(
notifications,
&acks,
params.filter,
Utc::now(),
limit,
offset,
))
}
async fn read_my_groups(
js: &async_nats::jetstream::Context,
pc_id: &str,
) -> HandlerResult<Vec<String>> {
let kv = js.get_key_value(BUCKET_AGENT_GROUPS).await.map_err(|e| {
warn!(error = %e, "notifications.list: open BUCKET_AGENT_GROUPS failed");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: open group membership: {e}"),
)
})?;
match kv.get(pc_id).await {
Ok(Some(bytes)) => Ok(parse_groups(&bytes)),
Ok(None) => Ok(Vec::new()),
Err(e) => {
warn!(error = %e, "notifications.list: agent_groups read failed");
Err(RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: read group membership: {e}"),
))
}
}
}
async fn replay_notifications(
js: &async_nats::jetstream::Context,
subjects: Vec<String>,
) -> HandlerResult<Vec<Notification>> {
let stream = js.get_stream(STREAM_NOTIFICATIONS).await.map_err(|e| {
warn!(error = %e, "notifications.list: get_stream NOTIFICATIONS failed");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: open stream: {e}"),
)
})?;
let consumer = stream
.create_consumer(PullConfig {
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::None,
filter_subjects: subjects,
inactive_threshold: std::time::Duration::from_secs(30),
..Default::default()
})
.await
.map_err(|e| {
warn!(error = %e, "notifications.list: create ephemeral consumer failed");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: create consumer: {e}"),
)
})?;
let mut buf: std::collections::VecDeque<Notification> =
std::collections::VecDeque::with_capacity(REPLAY_BATCH.min(MAX_REPLAY));
let mut dropped = 0usize;
loop {
let mut batch = consumer
.fetch()
.max_messages(REPLAY_BATCH)
.expires(std::time::Duration::from_millis(200))
.messages()
.await
.map_err(|e| {
warn!(error = %e, "notifications.list: fetch failed");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: fetch: {e}"),
)
})?;
let mut got = 0usize;
while let Some(m) = batch.next().await {
let m = m.map_err(|e| {
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: message: {e}"),
)
})?;
got += 1;
match serde_json::from_slice::<Notification>(&m.payload) {
Ok(n) => {
buf.push_back(n);
if buf.len() > MAX_REPLAY {
buf.pop_front();
dropped += 1;
}
}
Err(e) => warn!(
error = %e,
subject = %m.subject,
"notifications.list: skipping unparseable notification",
),
}
}
if got < REPLAY_BATCH {
break;
}
}
if dropped > 0 {
warn!(
dropped,
cap = MAX_REPLAY,
"notifications.list: stream exceeded replay cap; oldest beyond the cap omitted",
);
}
Ok(Vec::from(buf))
}
#[derive(Deserialize)]
struct ReadMark {
acked_at: DateTime<Utc>,
}
async fn read_user_acks(
js: &async_nats::jetstream::Context,
pc_id: &str,
user_sid: &str,
) -> HandlerResult<HashMap<String, DateTime<Utc>>> {
let kv = js
.get_key_value(BUCKET_NOTIFICATIONS_READ)
.await
.map_err(|e| {
warn!(error = %e, "notifications.list: open notifications_read failed");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: open read state: {e}"),
)
})?;
let prefix = notifications_read_prefix(pc_id, user_sid);
let wildcard = format!("{prefix}>");
let mut history = kv.history(&wildcard).await.map_err(|e| {
warn!(error = %e, %wildcard, "notifications.list: notifications_read history() failed");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: scan read state: {e}"),
)
})?;
let mut out = HashMap::new();
while let Some(entry) = history.next().await {
let entry = entry.map_err(|e| {
warn!(error = %e, "notifications.list: read-state history stream faulted");
RpcError::new(
ErrorKind::InternalError,
format!("notifications.list: stream read state: {e}"),
)
})?;
if entry.operation != Operation::Put {
continue;
}
let Some(id) = entry.key.strip_prefix(&prefix) else {
continue;
};
match serde_json::from_slice::<ReadMark>(&entry.value) {
Ok(mark) => {
out.insert(id.to_string(), mark.acked_at);
}
Err(e) => {
warn!(key = %entry.key, error = %e, "notifications.list: skipping unparseable read mark")
}
}
}
Ok(out)
}
fn decode_cursor(cursor: Option<&str>) -> usize {
cursor.and_then(|c| c.parse::<usize>().ok()).unwrap_or(0)
}
fn build_notifications_list(
notifications: Vec<Notification>,
acks: &HashMap<String, DateTime<Utc>>,
filter: NotificationsFilter,
now: DateTime<Utc>,
limit: usize,
offset: usize,
) -> NotificationsListResult {
let mut idx_of: HashMap<String, usize> = HashMap::new();
let mut deduped: Vec<Notification> = Vec::new();
for mut n in notifications {
if n.expires_at.is_some_and(|exp| exp <= now) {
continue;
}
n.acked_at = acks.get(&n.id).copied();
match idx_of.get(&n.id) {
Some(&i) if n.issued_at <= deduped[i].issued_at => {}
Some(&i) => deduped[i] = n,
None => {
idx_of.insert(n.id.clone(), deduped.len());
deduped.push(n);
}
}
}
let mut items: Vec<Notification> = match filter {
NotificationsFilter::Unread => deduped
.into_iter()
.filter(|n| n.acked_at.is_none())
.collect(),
NotificationsFilter::All => deduped,
};
items.sort_by(|a, b| b.issued_at.cmp(&a.issued_at).then_with(|| a.id.cmp(&b.id)));
let total = items.len();
let page: Vec<Notification> = items.into_iter().skip(offset).take(limit).collect();
let next_offset = offset + page.len();
let next_cursor = (next_offset < total).then(|| next_offset.to_string());
NotificationsListResult {
items: page,
next_cursor,
}
}
fn valid_notification_id(id: &str) -> bool {
!id.is_empty()
&& id
.chars()
.all(|c| c.is_ascii_alphanumeric() || matches!(c, '_' | '-' | '.'))
}
async fn forward_notifications(
mut rx: broadcast::Receiver<Notification>,
push_tx: mpsc::Sender<Vec<u8>>,
pc_id: String,
) {
debug!(pc_id = %pc_id, "notifications forwarder: subscribed");
loop {
let notification = match rx.recv().await {
Ok(n) => n,
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!(
pc_id = %pc_id,
skipped,
"notifications forwarder: lagged; resuming at oldest buffered",
);
continue;
}
Err(broadcast::error::RecvError::Closed) => {
debug!(pc_id = %pc_id, "notifications forwarder: bus closed, exiting");
return;
}
};
let params = NotificationNewParams { notification };
let notif = match RpcNotification::new(method::NOTIFICATIONS_NEW, ¶ms) {
Ok(n) => n,
Err(e) => {
warn!(error = %e, "notifications forwarder: failed to encode notification");
continue;
}
};
let body = match serde_json::to_vec(¬if) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "notifications forwarder: failed to serialise frame");
continue;
}
};
if push_tx.send(body).await.is_err() {
debug!(pc_id = %pc_id, "notifications forwarder: push channel closed, exiting");
return;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::klp::auth::PeerCredentials;
use kanade_shared::ipc::envelope::RpcMessage;
use kanade_shared::ipc::notifications::NotificationPriority;
use kanade_shared::ipc::state::StateSnapshot;
use kanade_shared::wire::EffectiveConfig;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::watch;
fn dummy_snapshot() -> StateSnapshot {
StateSnapshot {
pc_id: "PC1234".into(),
online: true,
vpn: "unknown".into(),
checks: vec![],
agent_version: "0.43.0".into(),
target_version: "0.43.0".into(),
}
}
fn sample_notification(id: &str) -> Notification {
Notification {
id: id.into(),
priority: NotificationPriority::Emergency,
require_ack: true,
title: "緊急: ネットワーク機器メンテ".into(),
body: "22時から30分停止します".into(),
issued_at: chrono::Utc::now(),
issued_by: Some("infra-team".into()),
expires_at: None,
acked_at: None,
}
}
fn fresh_conn(
notif_tx: &broadcast::Sender<Notification>,
push_tx: mpsc::Sender<Vec<u8>>,
) -> ConnectionState {
let (_cfg_tx, cfg_rx) = watch::channel(EffectiveConfig::builtin_defaults());
let (_state_tx, state_rx) = watch::channel(dummy_snapshot());
ConnectionState::new(
PeerCredentials {
user: "DOMAIN\\alice".into(),
user_sid: "S-1-5-21-1001".into(),
session_id: 2,
},
"PC1234".into(),
"0.43.0".into(),
cfg_rx,
state_rx,
PathBuf::from("agent.log"),
push_tx,
)
.with_notifications(notif_tx.clone())
}
#[tokio::test]
async fn subscribe_returns_sub_n_id_and_registers_forwarder() {
let (notif_tx, _) = broadcast::channel(8);
let (push_tx, _push_rx) = mpsc::channel(8);
let mut conn = fresh_conn(¬if_tx, push_tx);
let r1 = handle_notifications_subscribe(&mut conn, NotificationsSubscribeParams::default())
.unwrap();
let r2 = handle_notifications_subscribe(&mut conn, NotificationsSubscribeParams::default())
.unwrap();
assert_eq!(r1.subscription, "sub-n-1");
assert_eq!(r2.subscription, "sub-n-2");
assert_eq!(conn.subscriptions.len(), 2);
}
#[tokio::test]
async fn subscribed_forwarder_pushes_notifications_new() {
let (notif_tx, _) = broadcast::channel(8);
let (push_tx, mut push_rx) = mpsc::channel(8);
let mut conn = fresh_conn(¬if_tx, push_tx);
let _ = handle_notifications_subscribe(&mut conn, NotificationsSubscribeParams::default())
.unwrap();
notif_tx.send(sample_notification("notif-9f3a")).unwrap();
let body = tokio::time::timeout(Duration::from_secs(1), push_rx.recv())
.await
.expect("forwarder should push within 1s")
.expect("push_tx still open");
let msg: RpcMessage = serde_json::from_slice(&body).expect("decode frame");
match msg {
RpcMessage::Notification(n) => {
assert_eq!(n.method, method::NOTIFICATIONS_NEW);
let params: NotificationNewParams =
serde_json::from_value(n.params).expect("decode NotificationNewParams");
assert_eq!(params.notification.id, "notif-9f3a");
assert_eq!(
params.notification.priority,
NotificationPriority::Emergency
);
}
other => panic!("expected Notification, got {other:?}"),
}
}
#[tokio::test]
async fn unsubscribe_aborts_forwarder() {
let (notif_tx, _) = broadcast::channel(8);
let (push_tx, mut push_rx) = mpsc::channel(8);
let mut conn = fresh_conn(¬if_tx, push_tx);
let r = handle_notifications_subscribe(&mut conn, NotificationsSubscribeParams::default())
.unwrap();
assert_eq!(conn.subscriptions.len(), 1);
handle_notifications_unsubscribe(
&mut conn,
NotificationsUnsubscribeParams {
subscription: r.subscription,
},
)
.expect("unsubscribe should succeed");
assert_eq!(conn.subscriptions.len(), 0);
notif_tx.send(sample_notification("notif-2")).unwrap();
let res = tokio::time::timeout(Duration::from_millis(200), push_rx.recv()).await;
assert!(res.is_err(), "expected no push after unsubscribe");
}
#[tokio::test]
async fn unsubscribe_unknown_id_returns_not_found() {
let (notif_tx, _) = broadcast::channel(8);
let (push_tx, _) = mpsc::channel(8);
let mut conn = fresh_conn(¬if_tx, push_tx);
let err = handle_notifications_unsubscribe(
&mut conn,
NotificationsUnsubscribeParams {
subscription: "sub-n-999".into(),
},
)
.expect_err("unknown id must error");
assert_eq!(err.data.expect("data").kind, ErrorKind::NotFound);
}
fn conn_for_ack(user_sid: &str) -> ConnectionState {
let (_cfg_tx, cfg_rx) = watch::channel(EffectiveConfig::builtin_defaults());
let (_state_tx, state_rx) = watch::channel(dummy_snapshot());
let (push_tx, _push_rx) = mpsc::channel(8);
ConnectionState::new(
PeerCredentials {
user: "DOMAIN\\alice".into(),
user_sid: user_sid.into(),
session_id: 2,
},
"PC1234".into(),
"0.43.0".into(),
cfg_rx,
state_rx,
PathBuf::from("agent.log"),
push_tx,
)
}
#[test]
fn valid_notification_id_accepts_ids_rejects_nats_unsafe() {
for ok in ["notif-9f3a", "maintenance-2026-05-20", "a.b", "Job_123"] {
assert!(valid_notification_id(ok), "{ok} should be valid");
}
for bad in ["", "has space", "wild*", "a>b", "with/slash", "qu?x"] {
assert!(!valid_notification_id(bad), "{bad:?} should be invalid");
}
}
#[tokio::test]
async fn ack_blank_or_unsafe_id_returns_invalid_params() {
let conn = conn_for_ack("S-1-5-21-1001");
for bad in [" ", "bad id", "wild*"] {
let err = handle_notifications_ack(&conn, NotificationsAckParams { id: bad.into() })
.await
.expect_err("bad id must error");
assert_eq!(
err.data.expect("data").kind,
ErrorKind::InvalidParams,
"id {bad:?}",
);
}
}
#[tokio::test]
async fn ack_unknown_sid_is_rejected() {
let conn = conn_for_ack("<unknown>");
let err = handle_notifications_ack(
&conn,
NotificationsAckParams {
id: "notif-1".into(),
},
)
.await
.expect_err("unknown SID must error");
let data = err.data.expect("data");
assert_eq!(data.kind, ErrorKind::Unauthorized);
assert!(data.detail.contains("SID"), "detail: {}", data.detail);
}
#[tokio::test]
async fn ack_without_nats_client_errors_internal() {
let conn = conn_for_ack("S-1-5-21-1001");
let err = handle_notifications_ack(
&conn,
NotificationsAckParams {
id: "notif-1".into(),
},
)
.await
.expect_err("missing NATS client must error");
assert_eq!(err.data.expect("data").kind, ErrorKind::InternalError);
}
fn list_base() -> DateTime<Utc> {
chrono::TimeZone::with_ymd_and_hms(&Utc, 2026, 6, 1, 12, 0, 0).unwrap()
}
fn notif_at(id: &str, issued: DateTime<Utc>, expires: Option<DateTime<Utc>>) -> Notification {
Notification {
id: id.into(),
priority: NotificationPriority::Info,
require_ack: false,
title: "t".into(),
body: "b".into(),
issued_at: issued,
issued_by: None,
expires_at: expires,
acked_at: None,
}
}
#[test]
fn decode_cursor_parses_offset_or_zero() {
assert_eq!(decode_cursor(None), 0);
assert_eq!(decode_cursor(Some("25")), 25);
assert_eq!(decode_cursor(Some("garbage")), 0);
}
#[test]
fn unread_filter_excludes_acked_and_keeps_unacked() {
let base = list_base();
let items = vec![
notif_at("a", base, None),
notif_at("b", base + chrono::Duration::seconds(60), None),
];
let mut acks = HashMap::new();
acks.insert("a".to_string(), base + chrono::Duration::seconds(120));
let r = build_notifications_list(items, &acks, NotificationsFilter::Unread, base, 50, 0);
assert_eq!(r.items.len(), 1, "only the unacked notification remains");
assert_eq!(r.items[0].id, "b");
assert!(r.items[0].acked_at.is_none());
assert!(r.next_cursor.is_none());
}
#[test]
fn all_filter_includes_acked_with_acked_at_annotated() {
let base = list_base();
let items = vec![notif_at("a", base, None)];
let mut acks = HashMap::new();
let when = base + chrono::Duration::seconds(120);
acks.insert("a".to_string(), when);
let r = build_notifications_list(items, &acks, NotificationsFilter::All, base, 50, 0);
assert_eq!(r.items.len(), 1);
assert_eq!(
r.items[0].acked_at,
Some(when),
"ack state annotated for history"
);
}
#[test]
fn drops_expired_in_both_filters() {
let base = list_base();
let past = base - chrono::Duration::seconds(60);
let items = vec![
notif_at("live", base, Some(base + chrono::Duration::seconds(3600))),
notif_at("dead", base, Some(past)),
];
for filter in [NotificationsFilter::Unread, NotificationsFilter::All] {
let r = build_notifications_list(items.clone(), &HashMap::new(), filter, base, 50, 0);
let ids: Vec<&str> = r.items.iter().map(|n| n.id.as_str()).collect();
assert_eq!(
ids,
vec!["live"],
"expired notification dropped ({filter:?})"
);
}
}
#[test]
fn newest_first_and_offset_pagination() {
let base = list_base();
let items = vec![
notif_at("oldest", base, None),
notif_at("mid", base + chrono::Duration::seconds(60), None),
notif_at("newest", base + chrono::Duration::seconds(120), None),
];
let p1 = build_notifications_list(
items.clone(),
&HashMap::new(),
NotificationsFilter::All,
base,
2,
0,
);
let ids1: Vec<&str> = p1.items.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids1, vec!["newest", "mid"], "newest first");
assert_eq!(p1.next_cursor.as_deref(), Some("2"));
let p2 = build_notifications_list(
items,
&HashMap::new(),
NotificationsFilter::All,
base,
2,
decode_cursor(p1.next_cursor.as_deref()),
);
let ids2: Vec<&str> = p2.items.iter().map(|n| n.id.as_str()).collect();
assert_eq!(ids2, vec!["oldest"]);
assert!(p2.next_cursor.is_none(), "tail page has no next cursor");
}
#[test]
fn dedups_by_id_keeping_newest_issued() {
let base = list_base();
let items = vec![
notif_at("dup", base, None),
notif_at("dup", base + chrono::Duration::seconds(60), None),
];
let r = build_notifications_list(
items,
&HashMap::new(),
NotificationsFilter::All,
base,
50,
0,
);
assert_eq!(r.items.len(), 1, "same id collapses to one");
assert_eq!(r.items[0].issued_at, base + chrono::Duration::seconds(60));
}
}