use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::error::SdkError;
pub type PortId = String;
pub type PortValue = serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct OAuthToken {
pub access_token: String,
pub refresh_token: Option<String>,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
pub token_type: Option<String>,
pub scope: Option<String>,
}
impl OAuthToken {
pub fn bearer(access_token: impl Into<String>) -> Self {
Self {
access_token: access_token.into(),
refresh_token: None,
expires_at: None,
token_type: Some("Bearer".to_string()),
scope: None,
}
}
}
#[async_trait]
pub trait StateStore: Send + Sync + 'static {
async fn append(&self, entry: PortValue) -> Result<PortId, SdkError>;
async fn load(&self, id: &PortId) -> Result<Option<PortValue>, SdkError>;
async fn list(&self, prefix: &str) -> Result<Vec<PortId>, SdkError>;
async fn delete(&self, id: &PortId) -> Result<(), SdkError>;
async fn load_all(&self, _prefix: &str) -> Result<Vec<(PortId, PortValue)>, SdkError> {
Ok(Vec::new())
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopStateStore;
#[async_trait]
impl StateStore for NoopStateStore {
async fn append(&self, _entry: PortValue) -> Result<PortId, SdkError> {
Err(SdkError::PortNotConfigured {
port: "StateStore",
})
}
async fn load(&self, _id: &PortId) -> Result<Option<PortValue>, SdkError> {
Ok(None)
}
async fn list(&self, _prefix: &str) -> Result<Vec<PortId>, SdkError> {
Ok(Vec::new())
}
async fn delete(&self, _id: &PortId) -> Result<(), SdkError> {
Ok(())
}
}
#[async_trait]
pub trait ConfigStore: Send + Sync + 'static {
fn get(&self, key: &str) -> Result<Option<PortValue>, SdkError>;
fn set(&self, key: &str, value: PortValue) -> Result<(), SdkError>;
fn list(&self) -> Result<Vec<(String, PortValue)>, SdkError>;
fn source(&self, _key: &str) -> Option<String> {
None
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopConfigStore;
#[async_trait]
impl ConfigStore for NoopConfigStore {
fn get(&self, _key: &str) -> Result<Option<PortValue>, SdkError> {
Ok(None)
}
fn set(&self, _key: &str, _value: PortValue) -> Result<(), SdkError> {
Ok(())
}
fn list(&self) -> Result<Vec<(String, PortValue)>, SdkError> {
Ok(Vec::new())
}
}
#[async_trait]
pub trait AuthProvider: Send + Sync + 'static {
async fn get_api_key(&self, provider: &str) -> Result<Option<String>, SdkError>;
async fn set_api_key(&self, provider: &str, key: &str) -> Result<(), SdkError>;
async fn delete_api_key(&self, provider: &str) -> Result<(), SdkError>;
async fn get_oauth(&self, provider: &str) -> Result<Option<OAuthToken>, SdkError>;
async fn set_oauth(&self, provider: &str, token: OAuthToken) -> Result<(), SdkError>;
async fn list_providers(&self) -> Result<Vec<String>, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopAuthProvider;
#[async_trait]
impl AuthProvider for NoopAuthProvider {
async fn get_api_key(&self, _provider: &str) -> Result<Option<String>, SdkError> {
Ok(None)
}
async fn set_api_key(&self, _provider: &str, _key: &str) -> Result<(), SdkError> {
Err(SdkError::PortNotConfigured { port: "AuthProvider" })
}
async fn delete_api_key(&self, _provider: &str) -> Result<(), SdkError> {
Ok(())
}
async fn get_oauth(&self, _provider: &str) -> Result<Option<OAuthToken>, SdkError> {
Ok(None)
}
async fn set_oauth(&self, _provider: &str, _token: OAuthToken) -> Result<(), SdkError> {
Err(SdkError::PortNotConfigured { port: "AuthProvider" })
}
async fn list_providers(&self) -> Result<Vec<String>, SdkError> {
Ok(Vec::new())
}
}
pub type EventTopic = String;
pub type EventPayload = serde_json::Value;
#[async_trait]
pub trait EventBus: Send + Sync + 'static {
async fn publish(&self, topic: &EventTopic, payload: EventPayload) -> Result<(), SdkError>;
async fn subscribe(&self, topic: &EventTopic) -> Result<SubscriptionHandle, SdkError>;
}
pub struct SubscriptionHandle {
_unsubscribe: Option<Box<dyn FnOnce() + Send + Sync>>,
receiver: Option<tokio::sync::mpsc::Receiver<(EventTopic, EventPayload)>>,
}
impl SubscriptionHandle {
pub async fn recv(&mut self) -> Option<(EventTopic, EventPayload)> {
match &mut self.receiver {
Some(rx) => rx.recv().await,
None => None,
}
}
}
impl std::fmt::Debug for SubscriptionHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubscriptionHandle")
.field("active", &self.receiver.is_some())
.finish()
}
}
impl SubscriptionHandle {
pub fn from_receiver(rx: tokio::sync::mpsc::Receiver<(EventTopic, EventPayload)>) -> Self {
Self {
_unsubscribe: None,
receiver: Some(rx),
}
}
}
pub struct InMemoryEventBus {
tx: tokio::sync::broadcast::Sender<(EventTopic, EventPayload)>,
}
impl InMemoryEventBus {
pub fn new(capacity: usize) -> Arc<Self> {
let (tx, _) = tokio::sync::broadcast::channel(capacity);
Arc::new(Self { tx })
}
}
#[async_trait]
impl EventBus for InMemoryEventBus {
async fn publish(&self, topic: &EventTopic, payload: EventPayload) -> Result<(), SdkError> {
let _ = self.tx.send((topic.clone(), payload));
Ok(())
}
async fn subscribe(&self, _topic: &EventTopic) -> Result<SubscriptionHandle, SdkError> {
let mut rx = self.tx.subscribe();
let (tx, rx2) = tokio::sync::mpsc::channel(64);
let _ = tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if tx.send(event).await.is_err() {
break;
}
}
});
Ok(SubscriptionHandle {
_unsubscribe: None,
receiver: Some(rx2),
})
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopEventBus;
#[async_trait]
impl EventBus for NoopEventBus {
async fn publish(&self, _topic: &EventTopic, _payload: EventPayload) -> Result<(), SdkError> {
Ok(())
}
async fn subscribe(&self, _topic: &EventTopic) -> Result<SubscriptionHandle, SdkError> {
Ok(SubscriptionHandle {
_unsubscribe: None,
receiver: None,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SkillMeta {
pub name: String,
pub description: String,
pub path: PathBuf,
pub version: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Skill {
pub meta: SkillMeta,
pub body: String,
}
#[async_trait]
pub trait SkillLoader: Send + Sync + 'static {
async fn list(&self) -> Result<Vec<SkillMeta>, SdkError>;
async fn load(&self, name: &str) -> Result<Option<Skill>, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopSkillLoader;
#[async_trait]
impl SkillLoader for NoopSkillLoader {
async fn list(&self) -> Result<Vec<SkillMeta>, SdkError> {
Ok(Vec::new())
}
async fn load(&self, _name: &str) -> Result<Option<Skill>, SdkError> {
Ok(None)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Persona {
pub name: String,
pub system_prompt: String,
pub preferred_model: Option<String>,
pub allowed_tools: Option<Vec<String>>,
}
#[async_trait]
pub trait PersonaProvider: Send + Sync + 'static {
async fn list(&self) -> Result<Vec<Persona>, SdkError>;
async fn get(&self, name: &str) -> Result<Option<Persona>, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopPersonaProvider;
#[async_trait]
impl PersonaProvider for NoopPersonaProvider {
async fn list(&self) -> Result<Vec<Persona>, SdkError> {
Ok(Vec::new())
}
async fn get(&self, _name: &str) -> Result<Option<Persona>, SdkError> {
Ok(None)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCallRequest {
pub tool: String,
pub action: String,
pub cwd: PathBuf,
pub subject: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum AccessDecision {
Allow,
AllowWithAudit,
Deny { reason: String },
RequireApproval { reason: String },
}
#[async_trait]
pub trait AccessGate: Send + Sync + 'static {
async fn check(&self, request: &ToolCallRequest) -> Result<AccessDecision, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct AllowAllAccessGate;
#[async_trait]
impl AccessGate for AllowAllAccessGate {
async fn check(&self, _request: &ToolCallRequest) -> Result<AccessDecision, SdkError> {
Ok(AccessDecision::Allow)
}
}
#[async_trait]
pub trait CapabilityResolver: Send + Sync + 'static {
async fn visible_tools(&self, subject: &str) -> Result<Vec<String>, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct EmptyCapabilityResolver;
#[async_trait]
impl CapabilityResolver for EmptyCapabilityResolver {
async fn visible_tools(&self, _subject: &str) -> Result<Vec<String>, SdkError> {
Ok(Vec::new())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryEntry {
pub id: String,
pub subject: String,
pub kind: String,
pub embedding: Option<Vec<f32>>,
pub content: PortValue,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[async_trait]
pub trait MemoryStore: Send + Sync + 'static {
async fn put(&self, entry: MemoryEntry) -> Result<(), SdkError>;
async fn search(&self, _query: &[f32], _k: usize) -> Result<Vec<MemoryEntry>, SdkError> {
Ok(Vec::new())
}
async fn list(&self, subject: &str) -> Result<Vec<MemoryEntry>, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopMemoryStore;
#[async_trait]
impl MemoryStore for NoopMemoryStore {
async fn put(&self, _entry: MemoryEntry) -> Result<(), SdkError> {
Err(SdkError::PortNotConfigured { port: "MemoryStore" })
}
async fn list(&self, _subject: &str) -> Result<Vec<MemoryEntry>, SdkError> {
Ok(Vec::new())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronJob {
pub id: String,
pub schedule: String,
pub action: String,
pub payload: Option<PortValue>,
}
#[async_trait]
pub trait CronScheduler: Send + Sync + 'static {
async fn register(&self, job: CronJob) -> Result<(), SdkError>;
async fn unregister(&self, id: &str) -> Result<(), SdkError>;
async fn list(&self) -> Result<Vec<CronJob>, SdkError>;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopCronScheduler;
#[async_trait]
impl CronScheduler for NoopCronScheduler {
async fn register(&self, _job: CronJob) -> Result<(), SdkError> {
Err(SdkError::PortNotConfigured { port: "CronScheduler" })
}
async fn unregister(&self, _id: &str) -> Result<(), SdkError> {
Ok(())
}
async fn list(&self) -> Result<Vec<CronJob>, SdkError> {
Ok(Vec::new())
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResourceUsage {
pub cpu_percent: f32,
pub memory_bytes: u64,
pub disk_bytes: u64,
pub active_agents: usize,
pub tokens_consumed: u64,
}
#[async_trait]
pub trait ResourceMonitor: Send + Sync + 'static {
async fn snapshot(&self) -> Result<ResourceUsage, SdkError>;
async fn is_over_budget(&self) -> Result<bool, SdkError> {
Ok(false)
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopResourceMonitor;
#[async_trait]
impl ResourceMonitor for NoopResourceMonitor {
async fn snapshot(&self) -> Result<ResourceUsage, SdkError> {
Ok(ResourceUsage::default())
}
}
#[derive(Clone)]
pub struct PortRegistry {
pub state: Arc<dyn StateStore>,
pub config: Arc<dyn ConfigStore>,
pub auth: Arc<dyn AuthProvider>,
pub event_bus: Arc<dyn EventBus>,
pub skills: Arc<dyn SkillLoader>,
pub personas: Arc<dyn PersonaProvider>,
pub access: Arc<dyn AccessGate>,
pub capabilities: Arc<dyn CapabilityResolver>,
pub memory: Arc<dyn MemoryStore>,
pub cron: Arc<dyn CronScheduler>,
pub resources: Arc<dyn ResourceMonitor>,
}
impl std::fmt::Debug for PortRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PortRegistry")
.field("state", &"<dyn StateStore>")
.field("config", &"<dyn ConfigStore>")
.field("auth", &"<dyn AuthProvider>")
.field("event_bus", &"<dyn EventBus>")
.field("skills", &"<dyn SkillLoader>")
.field("personas", &"<dyn PersonaProvider>")
.field("access", &"<dyn AccessGate>")
.field("capabilities", &"<dyn CapabilityResolver>")
.field("memory", &"<dyn MemoryStore>")
.field("cron", &"<dyn CronScheduler>")
.field("resources", &"<dyn ResourceMonitor>")
.finish()
}
}
impl Default for PortRegistry {
fn default() -> Self {
Self::noop()
}
}
impl PortRegistry {
pub fn noop() -> Self {
Self {
state: Arc::new(NoopStateStore),
config: Arc::new(NoopConfigStore),
auth: Arc::new(NoopAuthProvider),
event_bus: Arc::new(NoopEventBus),
skills: Arc::new(NoopSkillLoader),
personas: Arc::new(NoopPersonaProvider),
access: Arc::new(AllowAllAccessGate),
capabilities: Arc::new(EmptyCapabilityResolver),
memory: Arc::new(NoopMemoryStore),
cron: Arc::new(NoopCronScheduler),
resources: Arc::new(NoopResourceMonitor),
}
}
pub async fn from_directory(_dir: &Path) -> Self {
Self::noop()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[tokio::test]
async fn noop_state_store_load_returns_none() {
let s = NoopStateStore;
assert!(s.load(&"x".into()).await.unwrap().is_none());
assert!(s.list("").await.unwrap().is_empty());
}
#[tokio::test]
async fn noop_state_store_append_errors() {
let s = NoopStateStore;
let err = s.append(json!({})).await.unwrap_err();
assert!(matches!(err, SdkError::PortNotConfigured { port: "StateStore" }));
}
#[test]
fn noop_config_get_returns_none() {
let c = NoopConfigStore;
assert!(c.get("any").unwrap().is_none());
assert!(c.list().unwrap().is_empty());
}
#[tokio::test]
async fn noop_auth_get_api_key_returns_none() {
let a = NoopAuthProvider;
assert!(a.get_api_key("anthropic").await.unwrap().is_none());
assert!(a.list_providers().await.unwrap().is_empty());
}
#[tokio::test]
async fn in_memory_event_bus_round_trip() {
let bus = InMemoryEventBus::new(8);
bus.publish(&"test".to_string(), json!({"hello": "world"}))
.await
.unwrap();
let mut sub = bus.subscribe(&"test".to_string()).await.unwrap();
bus.publish(&"test".to_string(), json!({"k": 1})).await.unwrap();
let (topic, payload) = sub.recv().await.unwrap();
assert_eq!(topic, "test");
assert_eq!(payload, json!({"k": 1}));
}
#[tokio::test]
async fn noop_event_bus_publish_succeeds_but_subscribes_return_none() {
let bus = NoopEventBus;
bus.publish(&"x".to_string(), json!({})).await.unwrap();
let mut sub = bus.subscribe(&"x".to_string()).await.unwrap();
assert!(sub.recv().await.is_none());
}
#[test]
fn default_registry_is_noop() {
let reg = PortRegistry::default();
assert!(Arc::strong_count(®.state) >= 1);
}
#[test]
fn oauth_token_bearer_constructor() {
let t = OAuthToken::bearer("abc");
assert_eq!(t.access_token, "abc");
assert_eq!(t.token_type.as_deref(), Some("Bearer"));
}
}
pub mod fs;
pub mod inmem;