use anyhow::Result;
use async_trait::async_trait;
use capnweb_core::{CapId, RpcError};
use capnweb_server::{RpcTarget, Server, ServerConfig};
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::{info, warn};
#[derive(Debug)]
struct CounterService {
count: Arc<Mutex<i64>>,
}
impl CounterService {
fn new() -> Self {
Self {
count: Arc::new(Mutex::new(0)),
}
}
}
#[async_trait]
impl RpcTarget for CounterService {
async fn call(&self, method: &str, _args: Vec<Value>) -> Result<Value, RpcError> {
match method {
"increment" => {
let mut count = self
.count
.lock()
.map_err(|e| RpcError::internal(format!("Counter lock poisoned: {}", e)))?;
*count += 1;
Ok(json!({ "count": *count }))
}
"decrement" => {
let mut count = self
.count
.lock()
.map_err(|e| RpcError::internal(format!("Counter lock poisoned: {}", e)))?;
*count -= 1;
Ok(json!({ "count": *count }))
}
"get" => {
let count = self
.count
.lock()
.map_err(|e| RpcError::internal(format!("Counter lock poisoned: {}", e)))?;
Ok(json!({ "count": *count }))
}
"reset" => {
let mut count = self
.count
.lock()
.map_err(|e| RpcError::internal(format!("Counter lock poisoned: {}", e)))?;
*count = 0;
Ok(json!({ "count": 0 }))
}
_ => Err(RpcError::not_found(format!("Unknown method: {}", method))),
}
}
}
#[derive(Debug)]
struct KeyValueStore {
store: Arc<Mutex<HashMap<String, Value>>>,
}
impl KeyValueStore {
fn new() -> Self {
Self {
store: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait]
impl RpcTarget for KeyValueStore {
async fn call(&self, method: &str, args: Vec<Value>) -> Result<Value, RpcError> {
match method {
"get" => {
if args.is_empty() {
return Err(RpcError::bad_request("get requires a key"));
}
let key = args[0]
.as_str()
.ok_or_else(|| RpcError::bad_request("Key must be a string"))?;
let store = self
.store
.lock()
.map_err(|e| RpcError::internal(format!("Store lock poisoned: {}", e)))?;
match store.get(key) {
Some(value) => Ok(json!({ "value": value })),
None => Ok(json!({ "value": null })),
}
}
"set" => {
if args.len() < 2 {
return Err(RpcError::bad_request("set requires key and value"));
}
let key = args[0]
.as_str()
.ok_or_else(|| RpcError::bad_request("Key must be a string"))?;
let mut store = self
.store
.lock()
.map_err(|e| RpcError::internal(format!("Store lock poisoned: {}", e)))?;
store.insert(key.to_string(), args[1].clone());
Ok(json!({ "success": true }))
}
"delete" => {
if args.is_empty() {
return Err(RpcError::bad_request("delete requires a key"));
}
let key = args[0]
.as_str()
.ok_or_else(|| RpcError::bad_request("Key must be a string"))?;
let mut store = self
.store
.lock()
.map_err(|e| RpcError::internal(format!("Store lock poisoned: {}", e)))?;
let existed = store.remove(key).is_some();
Ok(json!({ "deleted": existed }))
}
"list" => {
let store = self
.store
.lock()
.map_err(|e| RpcError::internal(format!("Store lock poisoned: {}", e)))?;
let keys: Vec<String> = store.keys().cloned().collect();
Ok(json!({ "keys": keys }))
}
"clear" => {
let mut store = self
.store
.lock()
.map_err(|e| RpcError::internal(format!("Store lock poisoned: {}", e)))?;
let count = store.len();
store.clear();
Ok(json!({ "cleared": count }))
}
_ => Err(RpcError::not_found(format!("Unknown method: {}", method))),
}
}
}
#[derive(Debug)]
struct TimeService;
#[async_trait]
impl RpcTarget for TimeService {
async fn call(&self, method: &str, args: Vec<Value>) -> Result<Value, RpcError> {
match method {
"now" => Ok(json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"unix": chrono::Utc::now().timestamp(),
})),
"delay" => {
let delay_ms = args.first().and_then(|v| v.as_u64()).unwrap_or(1000);
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
Ok(json!({
"delayed": delay_ms,
"timestamp": chrono::Utc::now().to_rfc3339(),
}))
}
"format" => {
let timestamp = args
.first()
.and_then(|v| v.as_i64())
.ok_or_else(|| RpcError::bad_request("format requires a unix timestamp"))?;
use chrono::DateTime;
let dt = DateTime::from_timestamp(timestamp, 0)
.ok_or_else(|| RpcError::bad_request("Invalid timestamp"))?;
Ok(json!({
"formatted": dt.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
"iso": dt.to_rfc3339(),
}))
}
_ => Err(RpcError::not_found(format!("Unknown method: {}", method))),
}
}
}
#[derive(Debug)]
struct MathService;
#[async_trait]
impl RpcTarget for MathService {
async fn call(&self, method: &str, args: Vec<Value>) -> Result<Value, RpcError> {
match method {
"fibonacci" => {
let n = args
.first()
.and_then(|v| v.as_u64())
.ok_or_else(|| RpcError::bad_request("fibonacci requires a number"))?;
if n > 93 {
return Err(RpcError::bad_request("fibonacci input too large (max 93)"));
}
let result = fibonacci(n);
Ok(json!({ "result": result, "n": n }))
}
"factorial" => {
let n = args
.first()
.and_then(|v| v.as_u64())
.ok_or_else(|| RpcError::bad_request("factorial requires a number"))?;
if n > 20 {
return Err(RpcError::bad_request("factorial input too large (max 20)"));
}
let result = factorial(n);
Ok(json!({ "result": result, "n": n }))
}
"isPrime" => {
let n = args
.first()
.and_then(|v| v.as_u64())
.ok_or_else(|| RpcError::bad_request("isPrime requires a number"))?;
let result = is_prime(n);
Ok(json!({ "isPrime": result, "n": n }))
}
"sqrt" => {
let n = args
.first()
.and_then(|v| v.as_f64())
.ok_or_else(|| RpcError::bad_request("sqrt requires a number"))?;
if n < 0.0 {
return Err(RpcError::bad_request("sqrt requires non-negative number"));
}
Ok(json!({ "result": n.sqrt() }))
}
_ => Err(RpcError::not_found(format!("Unknown method: {}", method))),
}
}
}
fn fibonacci(n: u64) -> u64 {
match n {
0 => 0,
1 => 1,
_ => {
let mut a = 0u64;
let mut b = 1u64;
for _ in 2..=n {
let temp = a + b;
a = b;
b = temp;
}
b
}
}
}
fn factorial(n: u64) -> u64 {
(1..=n).product()
}
fn is_prime(n: u64) -> bool {
if n < 2 {
return false;
}
for i in 2..=((n as f64).sqrt() as u64) {
if n % i == 0 {
return false;
}
}
true
}
#[derive(Debug)]
struct MainService;
#[async_trait]
impl RpcTarget for MainService {
async fn call(&self, method: &str, args: Vec<Value>) -> Result<Value, RpcError> {
match method {
"getCapability" => {
let id_value = args.first().ok_or_else(|| {
RpcError::bad_request("getCapability requires a capability ID argument")
})?;
let id_number = id_value
.as_number()
.ok_or_else(|| RpcError::bad_request("Capability ID must be a number"))?;
if !id_number.is_i64() && !id_number.is_u64() {
return Err(RpcError::bad_request("Capability ID must be an integer"));
}
let cap_id = if let Some(i64_val) = id_number.as_i64() {
if i64_val < 0 {
return Err(RpcError::bad_request("Capability ID must be non-negative"));
}
i64_val as u64
} else if let Some(u64_val) = id_number.as_u64() {
u64_val
} else {
return Err(RpcError::bad_request(
"Capability ID value is out of valid range",
));
};
match cap_id {
0..=4 => {
Ok(json!({
"$capnweb": {
"import_id": cap_id
}
}))
}
_ => Err(RpcError::not_found(format!(
"Capability {} not found",
cap_id
))),
}
}
"listServices" => Ok(json!({
"services": [
{ "id": 1, "name": "counter", "description": "Stateful counter service" },
{ "id": 2, "name": "keyvalue", "description": "Key-value store" },
{ "id": 3, "name": "time", "description": "Time and delay operations" },
{ "id": 4, "name": "math", "description": "Mathematical operations" },
]
})),
"health" => Ok(json!({
"status": "healthy",
"timestamp": chrono::Utc::now().to_rfc3339(),
})),
_ => Err(RpcError::not_found(format!("Unknown method: {}", method))),
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.init();
info!("Starting Cap'n Web Example Server");
let config = ServerConfig {
port: 8080,
host: "127.0.0.1".to_string(),
max_batch_size: 100,
};
let server = Server::new(config);
server.register_capability(CapId::new(0), Arc::new(MainService));
server.register_capability(CapId::new(1), Arc::new(CounterService::new()));
server.register_capability(CapId::new(2), Arc::new(KeyValueStore::new()));
server.register_capability(CapId::new(3), Arc::new(TimeService));
server.register_capability(CapId::new(4), Arc::new(MathService));
info!("Server configured with example services:");
info!(" - CapId(0): Main Service (bootstrap)");
info!(" - CapId(1): Counter Service");
info!(" - CapId(2): Key-Value Store");
info!(" - CapId(3): Time Service");
info!(" - CapId(4): Math Service");
info!("Starting server on http://127.0.0.1:8080");
info!("Endpoints:");
info!(" - HTTP Batch: http://127.0.0.1:8080/rpc/batch");
info!(" - WebSocket: ws://127.0.0.1:8080/rpc/ws");
if let Err(e) = server.run().await {
warn!("Server error: {}", e);
std::process::exit(1);
}
Ok(())
}