flow-server
High-performance web server with real-time updates via Server-Sent Events (SSE) and WebSockets, built on axum and tokio.
What it does
flow-server is the web backend for the Flow Kanban board. It's like a smart waiter in a restaurant:
- Serves files: Delivers the web interface (HTML, CSS, JavaScript)
- Provides data: Responds to API requests for tasks, sessions, and features
- Watches for changes: Monitors the file system and instantly notifies all connected browsers
- Manages real-time updates: Uses SSE and WebSockets to push changes to clients without them asking
It's built for speed and concurrency: sub-millisecond response times, handles hundreds of concurrent connections, and uses async I/O throughout.
Architecture
flow-server/
├── lib.rs - Server setup and router configuration
├── state.rs - Shared application state and metadata cache
├── routes/
│ ├── mod.rs - Route organization
│ ├── sessions.rs - List/get sessions
│ ├── tasks.rs - Task CRUD operations
│ ├── features.rs - Feature API (if database enabled)
│ └── theme.rs - Theme management
├── sse.rs - Server-Sent Events handler
├── ws.rs - WebSocket handler
├── watcher.rs - File system watcher (notify crate)
├── error.rs - Error types and conversions
└── helpers.rs - Utility functions
Data Flow
File System Change
↓
File Watcher (notify)
↓
Broadcast Channel (tokio)
↓ ↓
SSE WebSocket
↓ ↓
Browser Agent CLI
Usage
Starting the server
use run_server;
use AgentConfig;
async
Building a custom router
use ;
use ;
use Arc;
// Create shared state
let = channel;
let state = new;
// Build router
let app = build_router;
// Start server
let listener = bind.await?;
serve.await?;
Client-side: Consuming SSE events
// Connect to Server-Sent Events
const eventSource = ;
eventSource ;
eventSource ;
Client-side: Using WebSocket
// Connect to WebSocket
const ws = ;
ws ;
ws ;
// Send periodic ping
setInterval;
Making API requests
use reqwest;
use Value;
// Get all sessions
let sessions: Value = get
.await?
.json
.await?;
println!;
// Get tasks for a specific session
let tasks: Value = get
.await?
.json
.await?;
println!;
// Delete a task
new
.delete
.send
.await?;
API Reference
Endpoints
| Method | Path | Description |
|---|---|---|
GET |
/api/sessions |
List all available sessions |
GET |
/api/sessions/:session_id |
Get details for a specific session |
GET |
/api/tasks/all |
Get all tasks across all sessions |
POST |
/api/tasks/:session_id/:task_id/note |
Add a note to a task |
DELETE |
/api/tasks/:session_id/:task_id |
Delete a task |
GET |
/api/events |
Server-Sent Events (SSE) stream |
GET |
/api/ws |
WebSocket connection |
GET |
/api/theme |
Get current theme |
POST |
/api/theme |
Set theme |
GET |
/api/features/* |
Feature management (if DB enabled) |
Response Formats
Sessions List:
Tasks:
SSE Events:
Performance Features
Metadata Caching
Session metadata is expensive to compute (requires reading many JSONL files). The server caches it:
// Cache is automatically managed
// Refreshed every 10 seconds when accessed
// Double-checked locking prevents duplicate refreshes
let metadata = get_metadata.await;
// Fast: uses cache if less than 10s old
// Automatic: refreshes if stale
File Watching
Uses the notify crate with debouncing to avoid duplicate events:
// Watches recursively:
// - ~/.claude/tasks/**/*.json
// - ~/.claude/projects/**/*.jsonl
// Broadcasts to all SSE and WebSocket clients instantly
// Zero polling - instant updates
Async Everything
- All I/O is async (tokio runtime)
- File reading is non-blocking
- Database queries are non-blocking
- Broadcast channel is lock-free
Response Times
On a modern laptop:
- Static file serving: ~0.6ms
- Session list (with cache): ~1.2ms
- Task queries: ~0.6ms
- SSE connection setup: ~1ms
- WebSocket upgrade: ~1ms
Configuration
Environment Variables
# Server port (default: 3456)
PORT=8080
# Tasks directory (default: ~/.claude/tasks)
TASKS_DIR=/custom/path/tasks
# Projects directory (default: ~/.claude/projects)
PROJECTS_DIR=/custom/path/projects
# Public files directory (default: ./public)
PUBLIC_DIR=/path/to/frontend
# Open browser on startup (default: false)
OPEN_BROWSER=true
Programmatic Configuration
use AgentConfig;
use PathBuf;
let config = AgentConfig ;
File Watcher Details
Tasks Directory Watching
~/.claude/tasks/
└── session-abc-123/
├── task-1.json ← Watched
├── task-2.json ← Watched
└── task-3.json ← Watched
Events generated:
- File created →
{"type": "update", "event": "add", "sessionId": "...", "file": "task-1.json"} - File modified →
{"type": "update", "event": "change", "sessionId": "...", "file": "task-1.json"}
Projects Directory Watching
~/.claude/projects/
└── my-project/
├── session-abc.jsonl ← Watched
└── sessions-index.json ← Watched (future)
Events generated:
- JSONL modified →
{"type": "metadata-update"}
Handling Missing Directories
If ~/.claude/tasks doesn't exist yet, the watcher watches the parent (~/.claude) with non-recursive mode. When tasks/ is created, it starts watching recursively.
Testing
# Run all tests
# Run with output
# Test specific module
# Integration test: start a test server
Manual Testing
# Start server
# In another terminal, test SSE
# Test API
|
# Create a task file and watch SSE update
WebSocket vs SSE
When to use SSE (Server-Sent Events)
- ✅ Unidirectional: Server to client only
- ✅ Automatic reconnection: Browser handles it
- ✅ Text-based: Simple JSON messages
- ✅ Firewall-friendly: Uses HTTP
- ✅ Use case: Task updates, file changes
When to use WebSocket
- ✅ Bidirectional: Client can send messages
- ✅ Binary support: Can send binary data
- ✅ Lower latency: No HTTP overhead
- ✅ Full-duplex: Simultaneous send/receive
- ✅ Use case: Agent status updates, chat, real-time collaboration
Error Handling
use AppError;
use ;
// Custom error type with automatic conversion to HTTP responses
// Automatically converts to appropriate HTTP status codes
// NotFound → 404
// IoError → 500
// JsonError → 400
// DatabaseError → 500
Related Crates
- flow-core: Core types and configuration
- flow-db: Database backend (optional)
- flow-resolver: Dependency sorting for API responses
- flow-tui: Terminal UI alternative interface