use car_engine::Runtime;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{info, warn};
use crate::bridge::{
a2a_state_for, action_results_to_artifacts, message_to_proposal, short_id, task_with_status,
ProposalBuildError,
};
use crate::events::{EventBus, StreamEvent, TaskArtifactUpdateEvent, TaskStatusUpdateEvent};
use crate::push::PushDispatcher;
use crate::store::{AbortRegistry, ListFilter, TaskRecord, TaskStore};
use crate::types::{
AgentCard, GetTaskParams, ListTasksParams, ListTasksResult, Message, MessageRole, Part,
PushNotificationConfig, PushNotificationConfigParams, SendMessageParams, SendMessageResult,
Task, TaskState, TextPart,
};
pub type AgentCardSource = dyn Fn() -> AgentCard + Send + Sync;
#[derive(Debug, thiserror::Error)]
pub enum A2aRpcError {
#[error("method `{0}` is not supported by this agent")]
MethodNotFound(String),
#[error("invalid params: {0}")]
InvalidParams(String),
#[error("task `{0}` not found")]
TaskNotFound(String),
#[error("push-notification config `{0}` not found on task `{1}`")]
PushConfigNotFound(String, String),
#[error("could not build proposal: {0}")]
BadProposal(#[from] ProposalBuildError),
#[error("internal error: {0}")]
Internal(String),
}
impl A2aRpcError {
pub fn code(&self) -> i32 {
match self {
A2aRpcError::MethodNotFound(_) => -32601,
A2aRpcError::InvalidParams(_) => -32602,
A2aRpcError::TaskNotFound(_) => -32001,
A2aRpcError::PushConfigNotFound(_, _) => -32002,
A2aRpcError::BadProposal(_) => -32602,
A2aRpcError::Internal(_) => -32603,
}
}
}
#[derive(Debug, Clone)]
enum PublishKind {
Status { final_event: bool },
Artifact(crate::types::Artifact),
}
#[derive(Clone)]
pub struct A2aDispatcher {
runtime: Arc<Runtime>,
store: Arc<dyn TaskStore>,
card: Arc<AgentCardSource>,
push: PushDispatcher,
events: EventBus,
aborts: Arc<AbortRegistry>,
}
impl A2aDispatcher {
pub fn new(
runtime: Arc<Runtime>,
store: Arc<dyn TaskStore>,
card: Arc<AgentCardSource>,
) -> Self {
Self {
runtime,
store,
card,
push: PushDispatcher::default(),
events: EventBus::new(),
aborts: Arc::new(AbortRegistry::new()),
}
}
pub fn with_push(mut self, push: PushDispatcher) -> Self {
self.push = push;
self
}
pub async fn subscribe(&self, task_id: &str) -> broadcast::Receiver<StreamEvent> {
self.events.subscribe(task_id).await
}
pub async fn start_message_stream(
&self,
params: SendMessageParams,
) -> Result<(Task, broadcast::Receiver<StreamEvent>), A2aRpcError> {
let proposal = message_to_proposal(¶ms.message)?;
if let Some(existing_id) = params.message.task_id.clone() {
if let Some(record) = self.store.get(&existing_id).await {
if !record.task.status.state.is_terminal() {
return self
.stream_continue_existing_task(&existing_id, params, proposal)
.await;
}
}
}
let context_id = params
.message
.context_id
.clone()
.unwrap_or_else(|| format!("ctx-{}", short_id()));
let task_id = format!("task-{}", short_id());
let mut history_msg = params.message.clone();
history_msg.task_id = Some(task_id.clone());
history_msg.context_id = Some(context_id.clone());
let initial_task = task_with_status(
task_id.clone(),
context_id,
TaskState::Submitted,
vec![history_msg],
vec![],
);
let initial_record = self.store.create(initial_task).await;
let receiver = self.events.subscribe(&task_id).await;
self.spawn_executor(task_id, proposal, true).await;
Ok((initial_record.task, receiver))
}
async fn stream_continue_existing_task(
&self,
task_id: &str,
params: SendMessageParams,
proposal: car_ir::ActionProposal,
) -> Result<(Task, broadcast::Receiver<StreamEvent>), A2aRpcError> {
let mut history_msg = params.message.clone();
history_msg.task_id = Some(task_id.to_string());
if history_msg.context_id.is_none() {
history_msg.context_id = self.store.get(task_id).await.map(|r| r.task.context_id);
}
let _ = self.store.append_history(task_id, history_msg).await;
let receiver = self.events.subscribe(task_id).await;
self.spawn_executor(task_id.to_string(), proposal, false)
.await;
let record = self
.store
.get(task_id)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(task_id.to_string()))?;
Ok((record.task, receiver))
}
pub async fn resubscribe_task(
&self,
task_id: &str,
) -> Result<(Task, broadcast::Receiver<StreamEvent>), A2aRpcError> {
let record = self
.store
.get(task_id)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(task_id.to_string()))?;
let receiver = self.events.subscribe(task_id).await;
Ok((record.task, receiver))
}
pub async fn current_task(&self, task_id: &str) -> Option<Task> {
self.store.get(task_id).await.map(|r| r.task)
}
async fn publish(&self, task_id: &str, kind: PublishKind) {
let Some(record) = self.store.get(task_id).await else {
return;
};
let task = record.task.clone();
let configs: Vec<PushNotificationConfig> =
record.push_configs.values().cloned().collect();
self.push.deliver(configs, task.clone());
let event = match kind {
PublishKind::Status { final_event } => {
StreamEvent::StatusUpdate(TaskStatusUpdateEvent {
task_id: task.id.clone(),
context_id: task.context_id.clone(),
kind: "status-update".into(),
status: task.status,
final_event,
metadata: HashMap::new(),
})
}
PublishKind::Artifact(artifact) => {
StreamEvent::ArtifactUpdate(TaskArtifactUpdateEvent {
task_id: task.id.clone(),
context_id: task.context_id.clone(),
kind: "artifact-update".into(),
artifact,
append: false,
last_chunk: true,
metadata: HashMap::new(),
})
}
};
self.events.publish(task_id, event).await;
}
pub async fn dispatch(&self, method: &str, params: Value) -> Result<Value, A2aRpcError> {
info!(method = %method, "a2a dispatch");
match method {
"message/send" | "SendMessage" => self.handle_send_message(params).await,
"message/stream" | "SendStreamingMessage" | "tasks/resubscribe" | "SubscribeToTask" => {
Err(A2aRpcError::MethodNotFound(method.to_string()))
}
"tasks/get" | "GetTask" => self.handle_get_task(params).await,
"tasks/list" | "ListTasks" => self.handle_list_tasks(params).await,
"tasks/cancel" | "CancelTask" => self.handle_cancel_task(params).await,
"tasks/pushNotificationConfig/set" | "CreateTaskPushNotificationConfig" => {
self.handle_push_set(params).await
}
"tasks/pushNotificationConfig/get" | "GetTaskPushNotificationConfig" => {
self.handle_push_get(params).await
}
"tasks/pushNotificationConfig/list" | "ListTaskPushNotificationConfigs" => {
self.handle_push_list(params).await
}
"tasks/pushNotificationConfig/delete" | "DeleteTaskPushNotificationConfig" => {
self.handle_push_delete(params).await
}
"agent/getAuthenticatedExtendedCard" | "GetExtendedAgentCard" => {
self.handle_agent_card().await
}
_ => Err(A2aRpcError::MethodNotFound(method.to_string())),
}
}
async fn handle_send_message(&self, params: Value) -> Result<Value, A2aRpcError> {
let params: SendMessageParams = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
let proposal = message_to_proposal(¶ms.message)?;
if let Some(existing_id) = params.message.task_id.clone() {
if let Some(record) = self.store.get(&existing_id).await {
if !record.task.status.state.is_terminal() {
return self
.continue_existing_task(&existing_id, params, proposal)
.await;
}
}
}
let context_id = params
.message
.context_id
.clone()
.unwrap_or_else(|| format!("ctx-{}", short_id()));
let task_id = format!("task-{}", short_id());
let mut history_msg = params.message.clone();
history_msg.task_id = Some(task_id.clone());
history_msg.context_id = Some(context_id.clone());
let initial_task = task_with_status(
task_id.clone(),
context_id.clone(),
TaskState::Submitted,
vec![history_msg.clone()],
vec![],
);
let initial_record = self.store.create(initial_task).await;
let blocking = params
.configuration
.as_ref()
.map(|c| c.blocking)
.unwrap_or(false);
if blocking {
let final_record = self.execute_proposal(&task_id, proposal).await?;
return serde_json::to_value(SendMessageResult::Task(final_record.task))
.map_err(|e| A2aRpcError::Internal(e.to_string()));
}
self.spawn_executor(task_id.clone(), proposal, true).await;
serde_json::to_value(SendMessageResult::Task(initial_record.task))
.map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn spawn_executor(
&self,
task_id: String,
proposal: car_ir::ActionProposal,
gate_on_cancel: bool,
) {
let cancel = tokio_util::sync::CancellationToken::new();
self.aborts.register(task_id.clone(), cancel.clone()).await;
let this = self.clone();
let id = task_id.clone();
tokio::spawn(async move {
if gate_on_cancel {
if let Some(record) = this.store.get(&id).await {
if record.cancel_requested {
return;
}
}
}
let _ = this.store.update_state(&id, TaskState::Working).await;
this.publish(&id, PublishKind::Status { final_event: false })
.await;
let result = this.runtime.execute_with_cancel(&proposal, &cancel).await;
let artifacts = action_results_to_artifacts(&result);
for artifact in &artifacts {
this.publish(&id, PublishKind::Artifact(artifact.clone()))
.await;
}
let _ = this.store.append_artifacts(&id, artifacts).await;
let _ = this.store.update_state(&id, a2a_state_for(&result)).await;
let agent_reply = build_agent_reply(&id, &id, &result);
let _ = this.store.append_history(&id, agent_reply).await;
this.aborts.clear(&id).await;
let preempted = this
.store
.get(&id)
.await
.map(|r| r.cancel_requested)
.unwrap_or(false);
if !preempted {
this.publish(&id, PublishKind::Status { final_event: true })
.await;
}
});
}
async fn continue_existing_task(
&self,
task_id: &str,
params: SendMessageParams,
proposal: car_ir::ActionProposal,
) -> Result<Value, A2aRpcError> {
let mut history_msg = params.message.clone();
history_msg.task_id = Some(task_id.to_string());
if history_msg.context_id.is_none() {
history_msg.context_id = self.store.get(task_id).await.map(|r| r.task.context_id);
}
let _ = self.store.append_history(task_id, history_msg).await;
let blocking = params
.configuration
.as_ref()
.map(|c| c.blocking)
.unwrap_or(false);
if blocking {
let final_record = self.execute_proposal(task_id, proposal).await?;
return serde_json::to_value(SendMessageResult::Task(final_record.task))
.map_err(|e| A2aRpcError::Internal(e.to_string()));
}
self.spawn_executor(task_id.to_string(), proposal, false)
.await;
let record = self
.store
.get(task_id)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(task_id.to_string()))?;
serde_json::to_value(SendMessageResult::Task(record.task))
.map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn execute_proposal(
&self,
task_id: &str,
proposal: car_ir::ActionProposal,
) -> Result<TaskRecord, A2aRpcError> {
self.store.update_state(task_id, TaskState::Working).await;
self.publish(task_id, PublishKind::Status { final_event: false })
.await;
let result = self.runtime.execute(&proposal).await;
let artifacts = action_results_to_artifacts(&result);
for artifact in &artifacts {
self.publish(task_id, PublishKind::Artifact(artifact.clone()))
.await;
}
self.store
.append_artifacts(task_id, artifacts)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(task_id.to_string()))?;
let final_state = a2a_state_for(&result);
self.store
.update_state(task_id, final_state)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(task_id.to_string()))?;
let agent_reply = build_agent_reply(task_id, task_id, &result);
self.store.append_history(task_id, agent_reply).await;
let preempted = self
.store
.get(task_id)
.await
.map(|r| r.cancel_requested)
.unwrap_or(false);
if !preempted {
self.publish(task_id, PublishKind::Status { final_event: true })
.await;
}
self.store
.get(task_id)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(task_id.to_string()))
}
async fn handle_get_task(&self, params: Value) -> Result<Value, A2aRpcError> {
let params: GetTaskParams = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
let mut record = self
.store
.get(¶ms.id)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(params.id.clone()))?;
if let Some(limit) = params.history_length {
let lim = limit as usize;
if record.task.history.len() > lim {
let drop_count = record.task.history.len() - lim;
record.task.history.drain(..drop_count);
}
}
serde_json::to_value(record.task).map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn handle_list_tasks(&self, params: Value) -> Result<Value, A2aRpcError> {
let params: ListTasksParams = if params.is_null() {
ListTasksParams::default()
} else {
serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?
};
let records = self
.store
.list(ListFilter {
context_id: params.context_id,
state: params.state,
limit: params.limit,
})
.await;
let tasks: Vec<Task> = records.into_iter().map(|r| r.task).collect();
serde_json::to_value(ListTasksResult { tasks })
.map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn handle_cancel_task(&self, params: Value) -> Result<Value, A2aRpcError> {
#[derive(Deserialize)]
struct P {
id: String,
}
let p: P = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
let record = self
.store
.request_cancel(&p.id)
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(p.id.clone()))?;
if record.cancel_requested {
self.aborts.abort(&p.id).await;
self.publish(&p.id, PublishKind::Status { final_event: true })
.await;
} else {
warn!(task = %p.id, "cancel ignored — already terminal");
}
serde_json::to_value(record.task).map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn handle_push_set(&self, params: Value) -> Result<Value, A2aRpcError> {
let params: PushNotificationConfigParams = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
let config_id = format!("push-{}", short_id());
self.store
.add_push_config(¶ms.task_id, config_id.clone(), params.config.clone())
.await
.ok_or_else(|| A2aRpcError::TaskNotFound(params.task_id.clone()))?;
Ok(serde_json::json!({
"configId": config_id,
"config": params.config,
}))
}
async fn handle_push_get(&self, params: Value) -> Result<Value, A2aRpcError> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct P {
task_id: String,
config_id: String,
}
let p: P = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
let cfg = self
.store
.get_push_config(&p.task_id, &p.config_id)
.await
.ok_or_else(|| {
A2aRpcError::PushConfigNotFound(p.config_id.clone(), p.task_id.clone())
})?;
serde_json::to_value(cfg).map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn handle_push_list(&self, params: Value) -> Result<Value, A2aRpcError> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct P {
task_id: String,
}
let p: P = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
let configs = self.store.list_push_configs(&p.task_id).await;
let entries: Vec<PushConfigEntry> = configs
.into_iter()
.map(|(id, config)| PushConfigEntry {
config_id: id,
config,
})
.collect();
serde_json::to_value(entries).map_err(|e| A2aRpcError::Internal(e.to_string()))
}
async fn handle_push_delete(&self, params: Value) -> Result<Value, A2aRpcError> {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct P {
task_id: String,
config_id: String,
}
let p: P = serde_json::from_value(params)
.map_err(|e| A2aRpcError::InvalidParams(e.to_string()))?;
self.store
.remove_push_config(&p.task_id, &p.config_id)
.await
.ok_or_else(|| {
A2aRpcError::PushConfigNotFound(p.config_id.clone(), p.task_id.clone())
})?;
Ok(serde_json::json!({ "deleted": true }))
}
async fn handle_agent_card(&self) -> Result<Value, A2aRpcError> {
let card = (self.card)();
serde_json::to_value(card).map_err(|e| A2aRpcError::Internal(e.to_string()))
}
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PushConfigEntry {
config_id: String,
config: PushNotificationConfig,
}
fn build_agent_reply(task_id: &str, context_id: &str, result: &car_ir::ProposalResult) -> Message {
Message {
message_id: format!("msg-{}", short_id()),
role: MessageRole::Agent,
parts: vec![Part::Text(TextPart {
text: summary_text(result),
metadata: HashMap::new(),
})],
task_id: Some(task_id.to_string()),
context_id: Some(context_id.to_string()),
metadata: HashMap::new(),
}
}
fn summary_text(result: &car_ir::ProposalResult) -> String {
let succeeded = result
.results
.iter()
.filter(|r| matches!(r.status, car_ir::ActionStatus::Succeeded))
.count();
let failed = result.results.len() - succeeded;
if result.results.is_empty() {
"Acknowledged.".to_string()
} else if failed == 0 {
format!("Completed {} action(s).", succeeded)
} else {
format!(
"{} succeeded, {} failed/skipped of {} action(s).",
succeeded,
failed,
result.results.len()
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::InMemoryTaskStore;
use crate::types::{AgentCapabilities, AgentInterface, AgentProvider, TransportProtocol};
use car_engine::Runtime;
use serde_json::json;
fn test_card() -> AgentCard {
AgentCard {
name: "CAR test".into(),
description: "test agent".into(),
url: "https://example.test".into(),
version: "1.0.0".into(),
protocol_version: "1.0".into(),
preferred_transport: Some("JSONRPC".into()),
provider: AgentProvider {
organization: "Parslee".into(),
url: None,
},
capabilities: AgentCapabilities::default(),
default_input_modes: vec!["text".into()],
default_output_modes: vec!["text".into()],
skills: vec![],
documentation_url: None,
icon_url: None,
supported_interfaces: vec![],
additional_interfaces: vec![AgentInterface {
url: "wss://example.test".into(),
protocol_binding: "JSONRPC".into(),
transport: Some(TransportProtocol::JsonRpc),
tenant: None,
protocol_version: "1.0".into(),
}],
security_schemes: HashMap::new(),
supports_authenticated_extended_card: false,
security_requirements: vec![],
signatures: vec![],
}
}
fn dispatcher() -> A2aDispatcher {
let runtime = Arc::new(Runtime::new());
let store = Arc::new(InMemoryTaskStore::new());
let card: Arc<AgentCardSource> = Arc::new(test_card);
A2aDispatcher::new(runtime, store, card)
}
#[tokio::test]
async fn agent_card_round_trips_via_dispatch() {
let d = dispatcher();
let v = d
.dispatch("agent/getAuthenticatedExtendedCard", Value::Null)
.await
.expect("ok");
assert_eq!(v["name"], "CAR test");
assert_eq!(v["additionalInterfaces"][0]["transport"], "JSONRPC");
}
#[tokio::test]
async fn unknown_method_returns_method_not_found() {
let d = dispatcher();
let err = d.dispatch("foo/bar", Value::Null).await.unwrap_err();
assert_eq!(err.code(), -32601);
}
#[tokio::test]
async fn message_send_blocking_runs_proposal() {
let d = dispatcher();
let params = json!({
"message": {
"messageId": "m-1",
"role": "user",
"parts": [{
"kind": "data",
"data": { "tool": "missing.tool", "parameters": {} }
}]
},
"configuration": { "blocking": true }
});
let v = d.dispatch("message/send", params).await.expect("ok");
assert_eq!(v["id"].as_str().unwrap().starts_with("task-"), true);
let state: TaskState =
serde_json::from_value(v["status"]["state"].clone()).unwrap();
assert!(state.is_terminal(), "expected terminal state, got {:?}", state);
}
#[tokio::test]
async fn text_only_message_completes_not_fails() {
let d = dispatcher();
let send = json!({
"message": {
"messageId": "m-x",
"role": "user",
"parts": [{ "kind": "text", "text": "hi" }]
},
"configuration": { "blocking": true }
});
let task = d.dispatch("message/send", send).await.expect("ok");
assert_eq!(task["status"]["state"], "completed");
}
#[tokio::test]
async fn tasks_get_returns_known_task() {
let d = dispatcher();
let send = json!({
"message": {
"messageId": "m-x",
"role": "user",
"parts": [{ "kind": "text", "text": "hi" }]
},
"configuration": { "blocking": true }
});
let task = d.dispatch("message/send", send).await.expect("ok");
let task_id = task["id"].as_str().unwrap().to_string();
let fetched = d
.dispatch("tasks/get", json!({ "id": task_id }))
.await
.expect("ok");
assert_eq!(fetched["id"], task["id"]);
}
#[tokio::test]
async fn tasks_cancel_marks_canceled() {
let d = dispatcher();
let task = task_with_status(
"t-cancel".into(),
"ctx".into(),
TaskState::Submitted,
vec![],
vec![],
);
d.store.create(task).await;
let v = d
.dispatch("tasks/cancel", json!({ "id": "t-cancel" }))
.await
.expect("ok");
assert_eq!(v["status"]["state"], "canceled");
}
#[tokio::test]
async fn cancel_after_terminal_does_not_double_publish() {
let d = dispatcher();
let task = task_with_status(
"t-done".into(),
"ctx".into(),
TaskState::Completed,
vec![],
vec![],
);
d.store.create(task).await;
let mut rx = d.subscribe("t-done").await;
let _ = d
.dispatch("tasks/cancel", json!({ "id": "t-done" }))
.await
.expect("ok");
let res = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv()).await;
assert!(res.is_err(), "expected no SSE frame, got {:?}", res);
}
#[tokio::test]
async fn subscribe_receives_status_updates_during_async_send() {
let d = dispatcher();
let send = json!({
"message": {
"messageId": "m-async",
"role": "user",
"parts": [{ "kind": "text", "text": "go" }]
}
});
let task = d.dispatch("message/send", send).await.expect("ok");
let task_id = task["id"].as_str().unwrap().to_string();
let mut rx = d.subscribe(&task_id).await;
let mut saw_final = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
match tokio::time::timeout_at(deadline, rx.recv()).await {
Ok(Ok(StreamEvent::StatusUpdate(u))) => {
if u.final_event {
saw_final = true;
break;
}
}
Ok(Ok(_)) => {}
Ok(Err(_)) | Err(_) => break,
}
}
assert!(saw_final, "expected a final status-update on the stream");
}
#[tokio::test]
async fn push_config_lifecycle() {
let d = dispatcher();
d.store
.create(task_with_status(
"t-push".into(),
"ctx".into(),
TaskState::Working,
vec![],
vec![],
))
.await;
let set = d
.dispatch(
"tasks/pushNotificationConfig/set",
json!({
"taskId": "t-push",
"config": { "url": "https://example.com/hook" }
}),
)
.await
.expect("ok");
let config_id = set["configId"].as_str().unwrap().to_string();
let listed = d
.dispatch(
"tasks/pushNotificationConfig/list",
json!({ "taskId": "t-push" }),
)
.await
.expect("ok");
assert_eq!(listed.as_array().unwrap().len(), 1);
let _ = d
.dispatch(
"tasks/pushNotificationConfig/delete",
json!({ "taskId": "t-push", "configId": config_id }),
)
.await
.expect("ok");
}
}