use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use thiserror::Error;
#[async_trait]
pub trait PollerHost: Send + Sync + 'static {
async fn broker_publish(&self, topic: String, payload: Vec<u8>) -> Result<(), HostError>;
async fn credentials_get(&self, channel: String) -> Result<Value, HostError>;
async fn log(&self, level: LogLevel, message: String, fields: Value) -> Result<(), HostError>;
async fn metric_inc(&self, name: String, labels: Value) -> Result<(), HostError>;
async fn llm_invoke(&self, request: LlmInvokeRequest) -> Result<LlmInvokeResponse, HostError>;
}
#[derive(Debug, Error)]
pub enum HostError {
#[error("topic '{0}' not in plugin publish allowlist")]
TopicNotAllowed(String),
#[error("no credentials bound for channel '{0}'")]
CredentialsMissing(String),
#[error("broker unavailable: {0}")]
BrokerUnavailable(String),
#[error("daemon RPC timed out after {0:?}")]
Timeout(Duration),
#[error("rpc error: code={code} msg={message}")]
Rpc { code: i32, message: String },
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl HostError {
pub fn into_poller_kind(self) -> HostErrorKind {
match self {
Self::TopicNotAllowed(_) | Self::CredentialsMissing(_) => HostErrorKind::Permanent,
Self::BrokerUnavailable(_) | Self::Timeout(_) | Self::Other(_) => {
HostErrorKind::Transient
}
Self::Rpc { code, .. } => match code {
-32002 => HostErrorKind::Permanent,
-32602 => HostErrorKind::Config,
_ => HostErrorKind::Transient,
},
}
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum HostErrorKind {
Transient,
Permanent,
Config,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct TickAck {
pub next_cursor: Option<Vec<u8>>,
pub next_interval_hint: Option<Duration>,
pub metrics: Option<TickMetrics>,
}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TickMetrics {
pub items_seen: u32,
pub items_dispatched: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmInvokeRequest {
pub provider: String,
#[serde(default)]
pub model: String,
pub messages: Vec<LlmMessage>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_tokens: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub temperature: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmMessage {
pub role: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmInvokeResponse {
pub content: String,
pub model_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub usage: Option<LlmUsage>,
}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LlmUsage {
pub input_tokens: u32,
pub output_tokens: u32,
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_send_sync<T: Send + Sync + ?Sized>() {}
#[test]
fn poller_host_is_object_safe_and_send_sync() {
assert_send_sync::<dyn PollerHost>();
}
#[test]
fn tick_ack_default_is_empty() {
let a = TickAck::default();
assert!(a.next_cursor.is_none());
assert!(a.next_interval_hint.is_none());
assert!(a.metrics.is_none());
}
#[test]
fn host_error_classify_topic_not_allowed_is_permanent() {
let e = HostError::TopicNotAllowed("plugin.outbound.x".into());
assert_eq!(e.into_poller_kind(), HostErrorKind::Permanent);
}
#[test]
fn host_error_classify_creds_missing_is_permanent() {
let e = HostError::CredentialsMissing("google".into());
assert_eq!(e.into_poller_kind(), HostErrorKind::Permanent);
}
#[test]
fn host_error_classify_timeout_is_transient() {
let e = HostError::Timeout(Duration::from_secs(5));
assert_eq!(e.into_poller_kind(), HostErrorKind::Transient);
}
#[test]
fn host_error_classify_rpc_code_routes_correctly() {
let permanent = HostError::Rpc {
code: -32002,
message: "revoked".into(),
};
assert_eq!(permanent.into_poller_kind(), HostErrorKind::Permanent);
let config = HostError::Rpc {
code: -32602,
message: "missing field".into(),
};
assert_eq!(config.into_poller_kind(), HostErrorKind::Config);
let other = HostError::Rpc {
code: -1,
message: "unknown".into(),
};
assert_eq!(other.into_poller_kind(), HostErrorKind::Transient);
}
#[test]
fn log_level_serializes_lowercase() {
assert_eq!(serde_json::to_string(&LogLevel::Warn).unwrap(), "\"warn\"");
}
#[test]
fn llm_invoke_request_round_trips_serde() {
let req = LlmInvokeRequest {
provider: "anthropic".into(),
model: "claude-haiku-4-5".into(),
messages: vec![
LlmMessage {
role: "system".into(),
content: "You are helpful.".into(),
},
LlmMessage {
role: "user".into(),
content: "Hi".into(),
},
],
max_tokens: Some(128),
temperature: Some(0.7),
};
let json = serde_json::to_string(&req).unwrap();
let back: LlmInvokeRequest = serde_json::from_str(&json).unwrap();
assert_eq!(back.provider, "anthropic");
assert_eq!(back.messages.len(), 2);
assert_eq!(back.max_tokens, Some(128));
}
#[test]
fn llm_invoke_response_omits_none_usage() {
let resp = LlmInvokeResponse {
content: "ok".into(),
model_id: "claude-haiku-4-5".into(),
usage: None,
};
let json = serde_json::to_string(&resp).unwrap();
assert!(
!json.contains("usage"),
"None usage should be omitted: {json}"
);
}
}