coglet
Core Rust library for the coglet prediction server. Pure Rust with no Python
dependencies - the Python bindings live in coglet-python.
Architecture
coglet
┌─────────────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ transport/http │ │
│ │ ┌──────────────┐ ┌─────────────────────────────────┐ │ │
│ │ │ server.rs │ │ routes.rs │ │ │
│ │ │ Axum setup │ │ /health, /predictions, /cancel │ │ │
│ │ └──────────────┘ └─────────────────────────────────┘ │ │
│ └───────────────────────────────┬─────────────────────────┘ │
│ │ │
│ ┌───────────────────────────────▼─────────────────────────┐ │
│ │ service.rs │ │
│ │ PredictionService: health, permits, state, webhooks │ │
│ └───────────────────────────────┬─────────────────────────┘ │
│ │ │
│ ┌────────────────────────┼────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌────────────────────┐ ┌──────────┐ │
│ │ permit/ │ │ orchestrator.rs │ │webhook.rs│ │
│ │ PermitPool │ │ Parent-side: │ │ Sender │ │
│ │ Slot alloc │ │ spawn, route │ │ Retry │ │
│ └─────────────┘ └─────────┬──────────┘ └──────────┘ │
│ │ │
│ ┌────────────────────────────▼────────────────────────────┐ │
│ │ bridge/ │ │
│ │ ┌──────────────┐ ┌─────────────┐ ┌────────────────┐ │ │
│ │ │ protocol.rs │ │ codec.rs │ │ transport.rs │ │ │
│ │ │ Message types│ │ JSON lines │ │ Unix sockets │ │ │
│ │ └──────────────┘ └─────────────┘ └────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ worker.rs │ │
│ │ Child-side: PredictHandler trait, run_worker loop │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Directory Structure
coglet/
└── src/
├── lib.rs # Public API exports
│
│ # Core Types
├── health.rs # Health, SetupStatus, SetupResult
├── prediction.rs # Prediction state machine
├── predictor.rs # PredictionResult, PredictionError, PredictionOutput
├── version.rs # VersionInfo
│
│ # Service Layer
├── service.rs # PredictionService - lifecycle, state, webhooks
├── webhook.rs # WebhookSender, webhook types
│
│ # Orchestrator (Parent Process)
├── orchestrator.rs # spawn_worker, OrchestratorHandle, event loop
│
│ # Worker (Child Process)
├── worker.rs # run_worker, PredictHandler trait, SetupError
│
│ # Concurrency Control
├── permit/
│ ├── mod.rs
│ ├── pool.rs # PermitPool - slot permit management
│ └── slot.rs # PredictionSlot - permit + prediction binding
│
│ # IPC Bridge
├── bridge/
│ ├── mod.rs
│ ├── protocol.rs # ControlRequest, ControlResponse, SlotRequest, SlotResponse
│ ├── codec.rs # JsonCodec - newline-delimited JSON
│ └── transport.rs # Unix socket transport, ChildTransportInfo
│
│ # HTTP Transport
└── transport/
├── mod.rs
└── http/
├── mod.rs
├── server.rs # ServerConfig, serve()
└── routes.rs # Route handlers, request/response types
Key Components
PredictionService (service.rs)
Single owner of prediction state. Manages:
- Health state (Unknown → Starting → Ready/SetupFailed)
- PermitPool + Orchestrator reference
- Active predictions (DashMap — single source of truth)
- Cancellation (cancel tokens + orchestrator delegation)
- Webhooks fire from Prediction mutation methods (no dual state)
let service = new_no_pool
.with_health
.with_version;
// Later, after worker is ready:
service.set_orchestrator.await;
service.set_health.await;
Orchestrator (orchestrator.rs)
Parent-side worker lifecycle management.
spawn_worker(config)
│
├─▶ Create Unix socket transport (N slots)
├─▶ Spawn: python -c "import coglet; coglet.server._run_worker()"
├─▶ Send Init message via stdin
├─▶ Wait for worker to connect sockets
├─▶ Wait for Ready message (with timeout)
├─▶ Populate PermitPool with slot writers
├─▶ Spawn event loop task
└─▶ Return OrchestratorReady {pool, schema, handle}
Event loop handles:
ControlResponse::Idle- Slot ready for next predictionControlResponse::Failed- Slot poisoned, mark unavailableSlotResponse::Log/Output/Done/Failed- Route to prediction- Worker crash - Fail all in-flight predictions
Worker (worker.rs)
Child-side event loop. Implements PredictHandler trait.
run_worker(handler, config)
│
├─▶ Connect to slot sockets (from env)
├─▶ Setup control channel (stdin/stdout)
├─▶ Run handler.setup() with log routing
├─▶ Send Ready {slots, schema}
├─▶ Enter event loop:
│ - ControlRequest::Cancel → handler.cancel(slot)
│ - ControlRequest::Shutdown → exit
│ - SlotRequest::Predict → spawn prediction task
└─▶ Exit on shutdown or all slots poisoned
PermitPool (permit/pool.rs)
Slot-based concurrency control.
let pool = new;
// Add slot with its socket writer
pool.add_permit;
// Acquire permit (returns None if at capacity)
let permit = pool.try_acquire?;
// Send prediction request
permit.send.await?;
// Return permit when done
drop;
Bridge Protocol (bridge/protocol.rs)
Message types for parent-worker communication.
Control Channel:
ControlRequest: Init, Cancel, ShutdownControlResponse: Ready, Log, Idle, Failed, Cancelled, ShuttingDown
Slot Channel:
SlotRequest: PredictSlotResponse: Log, Output, Done, Failed, Cancelled
All messages are JSON with {"type": "..."} discriminator.
Behaviors
Health States
Unknown ──▶ Starting ──┬──▶ Ready ◀──▶ Busy
│
└──▶ SetupFailed ──▶ Defunct
- Unknown: Initial state, health-check returns status in body
- Starting: Setup in progress
- Ready: Accepting predictions
- Busy: Ready but all slots in use (HTTP 409 on new predictions)
- SetupFailed: setup() raised exception
- Defunct: Unrecoverable error
Prediction States
Starting ──▶ Processing ──┬──▶ Succeeded
├──▶ Failed
└──▶ Canceled
Cancellation
- HTTP DELETE /predictions/{id} or PUT /predictions/{id}/cancel
- Parent sends
ControlRequest::Cancel { slot } - Worker calls
handler.cancel(slot) - For sync: SIGUSR1 raises KeyboardInterrupt
- For async:
future.cancel()on the asyncio task - Prediction returns with
SlotResponse::Cancelled
Shutdown
Graceful (SIGTERM with await_explicit_shutdown):
- Stop accepting new predictions
- Wait for in-flight to complete
- Send
ControlRequest::Shutdown - Worker responds
ShuttingDown, exits - Parent exits
Immediate (SIGTERM without flag):
- Send
ControlRequest::Shutdown - Cancel in-flight predictions
- Exit
Worker crash:
- Control channel closes
- Event loop detects, fails all in-flight predictions
- Health → Defunct
Slot Poisoning
If a slot socket has an error (write fails, etc.), the slot is marked poisoned. It won't receive new predictions. If all slots are poisoned, worker exits.