# Using PegBoard with Tokio
## Overview
PegBoard is designed for concurrent access in async Rust applications. This guide shows how to use it with tokio.
## Key Points
### ✅ PegBoard is Already Thread-Safe
- `DashMap` provides concurrent read/write access
- All read methods (`get_tool`, `list_tools`, etc.) use `&self`
- Multiple tasks can read concurrently without blocking
### ✅ Use `Arc<PegBoard>` to Share
- Wrap PegBoard in `Arc` after registration
- Clone the Arc to share across tasks
- Services are shared, not duplicated
### ❌ Don't Implement Clone
- Services contain active connections
- Cloning would duplicate internal state incorrectly
- Arc is the idiomatic Rust solution
## Basic Pattern
```rust
use std::sync::Arc;
use magi_tool::PegBoard;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Step 1: Create and populate PegBoard
let mut pegboard = PegBoard::new();
// Register all your services
pegboard.add_service(Some("web".to_string()), web_service).await?;
pegboard.add_service(Some("fs".to_string()), fs_service).await?;
pegboard.add_service(None, calculator_service).await?;
// Step 2: Wrap in Arc (now immutable)
let pegboard = Arc::new(pegboard);
// Step 3: Use across tasks
handle_requests(Arc::clone(&pegboard)).await?;
Ok(())
}
```
## Complete Example: Web Server
```rust
use std::sync::Arc;
use axum::{Router, routing::post, extract::State, Json};
use magi_tool::{PegBoard, Tool};
use serde::{Deserialize, Serialize};
#[derive(Clone)]
struct AppState {
pegboard: Arc<PegBoard>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize PegBoard
let mut pegboard = PegBoard::new();
// Register MCP services
let web_service = connect_to_web_mcp().await?;
pegboard.add_service(Some("web".to_string()), web_service).await?;
let fs_service = connect_to_fs_mcp().await?;
pegboard.add_service(Some("fs".to_string()), fs_service).await?;
// Wrap in Arc for sharing
let state = AppState {
pegboard: Arc::new(pegboard),
};
// Build router
let app = Router::new()
.route("/tools", post(list_tools))
.route("/call", post(call_tool))
.with_state(state);
// Start server
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
println!("Server running on http://127.0.0.1:3000");
axum::serve(listener, app).await?;
Ok(())
}
#[derive(Serialize)]
struct ToolsResponse {
tools: Vec<Tool>,
}
// Handler: List all tools
async fn list_tools(
State(state): State<AppState>,
) -> Json<ToolsResponse> {
// Concurrent access - multiple requests can call this simultaneously
let tools = state.pegboard.get_all_tools();
Json(ToolsResponse { tools })
}
#[derive(Deserialize)]
struct CallToolRequest {
tool_name: String,
params: serde_json::Value,
}
#[derive(Serialize)]
struct CallToolResponse {
result: serde_json::Value,
}
// Handler: Call a tool
async fn call_tool(
State(state): State<AppState>,
Json(req): Json<CallToolRequest>,
) -> Result<Json<CallToolResponse>, String> {
// Call the tool directly - automatic routing!
let result = state
.pegboard
.call_tool(&req.tool_name, req.params)
.await
.map_err(|e| format!("Tool call failed: {:?}", e))?;
// Return the result
Ok(Json(CallToolResponse {
result: result.structured_content.unwrap_or_else(|| {
serde_json::json!({"content": result.content})
}),
}))
}
```
## Pattern: Multiple Concurrent Tasks
```rust
use std::sync::Arc;
use magi_tool::PegBoard;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut pegboard = PegBoard::new();
// Register services
pegboard.add_service(Some("web".to_string()), web_service).await?;
let pegboard = Arc::new(pegboard);
// Spawn multiple tasks that all access the same PegBoard
let mut handles = vec![];
// Task 1: Process LLM requests
let pb1 = Arc::clone(&pegboard);
handles.push(tokio::spawn(async move {
loop {
let tools = pb1.get_all_tools();
// Send to LLM, get response, route calls...
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}));
// Task 2: Health monitoring
let pb2 = Arc::clone(&pegboard);
handles.push(tokio::spawn(async move {
loop {
let count = pb2.tool_count();
println!("Total tools: {}", count);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}));
// Task 3: Query specific tools
let pb3 = Arc::clone(&pegboard);
handles.push(tokio::spawn(async move {
if let Some(tool) = pb3.get_tool("web-search") {
println!("Found tool: {}", tool.name);
}
}));
// Wait for all tasks (they run forever in this example)
for handle in handles {
let _ = handle.await;
}
Ok(())
}
```
## Pattern: Per-Request Task Spawning
```rust
use std::sync::Arc;
use magi_tool::PegBoard;
async fn handle_llm_request(
pegboard: Arc<PegBoard>,
user_request: String,
) -> Result<String, Box<dyn std::error::Error>> {
// Get tools for LLM
let tools = pegboard.get_all_tools();
// Send to LLM (simulated)
let llm_response = send_to_llm(&user_request, &tools).await?;
// Spawn tasks to handle tool calls in parallel
let mut tasks = vec![];
for tool_call in llm_response.tool_calls {
let pb = Arc::clone(&pegboard);
let task = tokio::spawn(async move {
execute_tool_call(&pb, tool_call).await
});
tasks.push(task);
}
// Wait for all tool calls to complete
let results = futures::future::join_all(tasks).await;
// Process results...
Ok("Success".to_string())
}
async fn execute_tool_call(
pegboard: &PegBoard,
tool_call: ToolCall,
) -> Result<serde_json::Value, Box<dyn std::error::Error>> {
// Execute directly - automatic routing!
let result = pegboard
.call_tool(&tool_call.name, tool_call.params)
.await?;
// Return the structured content or format the content
Ok(result.structured_content.unwrap_or_else(|| {
serde_json::json!({"content": result.content})
}))
}
```
## Memory Overhead
Using `Arc<PegBoard>` has minimal overhead:
- **Arc itself**: 16 bytes (pointer + ref counts)
- **Cloning Arc**: Just increments a ref count (cheap)
- **PegBoard data**: Only one copy in memory, shared by all tasks
## Performance Characteristics
| `get_tool()` | Multiple readers | Lock-free, O(1) |
| `list_all_tools()` | Multiple readers | Lock-free |
| `get_tool_route()` | Multiple readers | Lock-free, O(1) |
| Arc cloning | Any time | Cheap (atomic increment) |
## Anti-Patterns to Avoid
### ❌ Don't: Try to implement Clone
```rust
// DON'T DO THIS
impl Clone for PegBoard {
fn clone(&self) -> Self {
// This won't work - services can't be cloned
// And even if they could, you'd duplicate connections
}
}
```
### ❌ Don't: Use Mutex<PegBoard>
```rust
// DON'T DO THIS - unnecessary overhead
let pegboard = Arc::new(Mutex::new(pegboard));
// PegBoard's DashMap already handles concurrency
// Mutex would serialize all access unnecessarily
```
### ❌ Don't: Clone the entire PegBoard
```rust
// DON'T DO THIS
let pegboard2 = pegboard.clone(); // Won't compile anyway
// DO THIS instead
let pegboard2 = Arc::clone(&pegboard); // Share, don't duplicate
```
## Summary
**Pattern to Use:**
1. Create `PegBoard` and register services (mutable)
2. Wrap in `Arc::new()` (now immutable)
3. Clone `Arc` to share across tasks
4. All read operations are concurrent and lock-free
**Why This Works:**
- DashMap handles concurrent reads/writes internally
- Services are shared (not duplicated) via Arc
- Zero-cost abstraction - as efficient as possible
- Idiomatic Rust for sharing data across async tasks
## Questions?
- Need to add services after wrapping in Arc? Consider `Arc<Mutex<PegBoard>>` for the registration phase only
- Need to remove tools dynamically? Use the `unregister_*` methods
- Performance concerns? DashMap is optimized for concurrent access
The `Arc<PegBoard>` pattern is the recommended and most efficient way to use PegBoard with tokio!