flow-server 0.1.1

High-performance axum web server with SSE and WebSocket for Flow task monitoring
Documentation

flow-server

High-performance web server with real-time updates via Server-Sent Events (SSE) and WebSockets, built on axum and tokio.

What it does

flow-server is the web backend for the Flow Kanban board. It's like a smart waiter in a restaurant:

  • Serves files: Delivers the web interface (HTML, CSS, JavaScript)
  • Provides data: Responds to API requests for tasks, sessions, and features
  • Watches for changes: Monitors the file system and instantly notifies all connected browsers
  • Manages real-time updates: Uses SSE and WebSockets to push changes to clients without them asking

It's built for speed and concurrency: sub-millisecond response times, handles hundreds of concurrent connections, and uses async I/O throughout.

Architecture

flow-server/
├── lib.rs              - Server setup and router configuration
├── state.rs            - Shared application state and metadata cache
├── routes/
│   ├── mod.rs          - Route organization
│   ├── sessions.rs     - List/get sessions
│   ├── tasks.rs        - Task CRUD operations
│   ├── features.rs     - Feature API (if database enabled)
│   └── theme.rs        - Theme management
├── sse.rs              - Server-Sent Events handler
├── ws.rs               - WebSocket handler
├── watcher.rs          - File system watcher (notify crate)
├── error.rs            - Error types and conversions
└── helpers.rs          - Utility functions

Data Flow

File System Change
       ↓
  File Watcher (notify)
       ↓
  Broadcast Channel (tokio)
       ↓         ↓
      SSE       WebSocket
       ↓         ↓
   Browser    Agent CLI

Usage

Starting the server

use flow_server::run_server;
use flow_core::AgentConfig;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure server
    let config = AgentConfig {
        port: 3456,
        tasks_dir: Some("/Users/username/.claude/tasks".into()),
        projects_dir: Some("/Users/username/.claude/projects".into()),
        public_dir: Some("./public".into()),
        open_browser: true,
        ..Default::default()
    };

    // Run server (blocks until shutdown)
    run_server(config).await?;
    Ok(())
}

Building a custom router

use flow_server::{build_router, AppState, MetadataCache};
use tokio::sync::{broadcast, RwLock};
use std::sync::Arc;

// Create shared state
let (tx, _rx) = broadcast::channel(256);
let state = Arc::new(AppState {
    tasks_dir: "/tmp/tasks".into(),
    projects_dir: "/tmp/projects".into(),
    tx,
    metadata_cache: RwLock::new(MetadataCache::new()),
    db: None, // Optional database
});

// Build router
let app = build_router(state);

// Start server
let listener = tokio::net::TcpListener::bind("127.0.0.1:3456").await?;
axum::serve(listener, app).await?;

Client-side: Consuming SSE events

// Connect to Server-Sent Events
const eventSource = new EventSource('/api/events');

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);

  switch (data.type) {
    case 'connected':
      console.log('Connected to server');
      break;
    case 'update':
      console.log('File changed:', data.sessionId, data.file);
      refreshTaskList();
      break;
    case 'metadata-update':
      console.log('Project metadata changed');
      refreshSessions();
      break;
  }
};

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
};

Client-side: Using WebSocket

// Connect to WebSocket
const ws = new WebSocket('ws://localhost:3456/api/ws');

ws.onopen = () => {
  console.log('WebSocket connected');

  // Send agent status update
  ws.send(JSON.stringify({
    type: 'agent-status',
    agent: 'codex',
    status: 'working',
    task: 'Implementing OAuth'
  }));
};

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Received:', data);
};

// Send periodic ping
setInterval(() => {
  ws.send(JSON.stringify({ type: 'ping' }));
}, 30000);

Making API requests

use reqwest;
use serde_json::Value;

// Get all sessions
let sessions: Value = reqwest::get("http://localhost:3456/api/sessions")
    .await?
    .json()
    .await?;

println!("Sessions: {}", sessions);

// Get tasks for a specific session
let tasks: Value = reqwest::get("http://localhost:3456/api/tasks/all")
    .await?
    .json()
    .await?;

println!("Tasks: {}", tasks);

// Delete a task
reqwest::Client::new()
    .delete("http://localhost:3456/api/tasks/session-123/task-456")
    .send()
    .await?;

API Reference

Endpoints

Method Path Description
GET /api/sessions List all available sessions
GET /api/sessions/:session_id Get details for a specific session
GET /api/tasks/all Get all tasks across all sessions
POST /api/tasks/:session_id/:task_id/note Add a note to a task
DELETE /api/tasks/:session_id/:task_id Delete a task
GET /api/events Server-Sent Events (SSE) stream
GET /api/ws WebSocket connection
GET /api/theme Get current theme
POST /api/theme Set theme
GET /api/features/* Feature management (if DB enabled)

Response Formats

Sessions List:

{
  "sessions": [
    {
      "sessionId": "abc-123",
      "customTitle": "OAuth Implementation",
      "slug": "oauth-impl",
      "projectPath": "/Users/me/project",
      "taskCount": 5
    }
  ]
}

Tasks:

{
  "tasks": [
    {
      "id": "1",
      "subject": "Implement JWT auth",
      "description": "Add token generation and validation",
      "status": "in_progress",
      "owner": "claude",
      "blocks": ["2"],
      "blockedBy": []
    }
  ]
}

SSE Events:

{"type": "connected"}
{"type": "update", "event": "add", "sessionId": "abc-123", "file": "task-1.json"}
{"type": "metadata-update"}

Performance Features

Metadata Caching

Session metadata is expensive to compute (requires reading many JSONL files). The server caches it:

// Cache is automatically managed
// Refreshed every 10 seconds when accessed
// Double-checked locking prevents duplicate refreshes

let metadata = get_metadata(&state).await;
// Fast: uses cache if less than 10s old
// Automatic: refreshes if stale

File Watching

Uses the notify crate with debouncing to avoid duplicate events:

// Watches recursively:
// - ~/.claude/tasks/**/*.json
// - ~/.claude/projects/**/*.jsonl

// Broadcasts to all SSE and WebSocket clients instantly
// Zero polling - instant updates

Async Everything

  • All I/O is async (tokio runtime)
  • File reading is non-blocking
  • Database queries are non-blocking
  • Broadcast channel is lock-free

Response Times

On a modern laptop:

  • Static file serving: ~0.6ms
  • Session list (with cache): ~1.2ms
  • Task queries: ~0.6ms
  • SSE connection setup: ~1ms
  • WebSocket upgrade: ~1ms

Configuration

Environment Variables

# Server port (default: 3456)
PORT=8080

# Tasks directory (default: ~/.claude/tasks)
TASKS_DIR=/custom/path/tasks

# Projects directory (default: ~/.claude/projects)
PROJECTS_DIR=/custom/path/projects

# Public files directory (default: ./public)
PUBLIC_DIR=/path/to/frontend

# Open browser on startup (default: false)
OPEN_BROWSER=true

Programmatic Configuration

use flow_core::AgentConfig;
use std::path::PathBuf;

let config = AgentConfig {
    port: 3000,
    tasks_dir: Some(PathBuf::from("/tmp/tasks")),
    projects_dir: Some(PathBuf::from("/tmp/projects")),
    public_dir: Some(PathBuf::from("./dist")),
    open_browser: false,
    db_path: Some(PathBuf::from("features.db")), // Optional
    ..Default::default()
};

File Watcher Details

Tasks Directory Watching

~/.claude/tasks/
  └── session-abc-123/
      ├── task-1.json    ← Watched
      ├── task-2.json    ← Watched
      └── task-3.json    ← Watched

Events generated:

  • File created → {"type": "update", "event": "add", "sessionId": "...", "file": "task-1.json"}
  • File modified → {"type": "update", "event": "change", "sessionId": "...", "file": "task-1.json"}

Projects Directory Watching

~/.claude/projects/
  └── my-project/
      ├── session-abc.jsonl           ← Watched
      └── sessions-index.json         ← Watched (future)

Events generated:

  • JSONL modified → {"type": "metadata-update"}

Handling Missing Directories

If ~/.claude/tasks doesn't exist yet, the watcher watches the parent (~/.claude) with non-recursive mode. When tasks/ is created, it starts watching recursively.

Testing

# Run all tests
cargo test -p flow-server

# Run with output
cargo test -p flow-server -- --nocapture

# Test specific module
cargo test -p flow-server routes::

# Integration test: start a test server
cargo run -p flow-server -- --port 9999

Manual Testing

# Start server
cargo run -p flow-server

# In another terminal, test SSE
curl -N http://localhost:3456/api/events

# Test API
curl http://localhost:3456/api/sessions | jq

# Create a task file and watch SSE update
echo '{"id":"1","subject":"Test"}' > ~/.claude/tasks/test/1.json

WebSocket vs SSE

When to use SSE (Server-Sent Events)

  • Unidirectional: Server to client only
  • Automatic reconnection: Browser handles it
  • Text-based: Simple JSON messages
  • Firewall-friendly: Uses HTTP
  • Use case: Task updates, file changes

When to use WebSocket

  • Bidirectional: Client can send messages
  • Binary support: Can send binary data
  • Lower latency: No HTTP overhead
  • Full-duplex: Simultaneous send/receive
  • Use case: Agent status updates, chat, real-time collaboration

Error Handling

use flow_server::AppError;
use axum::{http::StatusCode, Json};

// Custom error type with automatic conversion to HTTP responses
pub enum AppError {
    NotFound(String),
    IoError(std::io::Error),
    JsonError(serde_json::Error),
    DatabaseError(String),
}

// Automatically converts to appropriate HTTP status codes
// NotFound → 404
// IoError → 500
// JsonError → 400
// DatabaseError → 500

Related Crates

Back to main README