Siphonophore
Extensible Yjs sync server primitive.
This aims to be an alternative to Hocuspocus, but with native document multiplexing in a single WebSocket connection.
Features
- Multiplexing: Multiple documents over a single WebSocket connection
- Hooks: Extend behavior without your application logic.
- Actor Model: Built with the awesome libraries of Kameo and Yrs
- Axum Integration: Compose with your existing HTTP routes
- Dead simple multiplexing protocol: Can be used in any YJS compatible client, with minimal effort.
Projects Using Siphonophore
Quick Start
use siphonophore::Server;
#[tokio::main]
async fn main() {
Server::new()
.serve("0.0.0.0:8080")
.await
.unwrap();
}
With Persistence
use siphonophore::{Server, Hook, HookResult, OnLoadDocumentPayload, BeforeCloseDirtyPayload};
use async_trait::async_trait;
struct FileStorage;
#[async_trait]
impl Hook for FileStorage {
async fn on_load_document(&self, p: OnLoadDocumentPayload<'_>)
-> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>>
{
let path = format!("data/{}.bin", p.doc_id);
match std::fs::read(&path) {
Ok(data) => Ok(Some(data)),
Err(_) => Ok(None),
}
}
async fn before_close_dirty(&self, p: BeforeCloseDirtyPayload<'_>) -> HookResult {
let path = format!("data/{}.bin", p.doc_id);
std::fs::write(&path, p.state)?;
Ok(())
}
}
#[tokio::main]
async fn main() {
Server::with_hooks(vec![Box::new(FileStorage)])
.serve("0.0.0.0:8080")
.await
.unwrap();
}
Authentication
Use the on_authenticate hook to validate and store user info in the context for later hooks.
use siphonophore::{Server, Hook, HookResult, OnAuthenticatePayload, OnChangePayload};
use async_trait::async_trait;
#[derive(Clone)]
struct User {
id: String,
name: String,
}
struct AuthHook {
}
#[async_trait]
impl Hook for AuthHook {
async fn on_authenticate(&self, p: OnAuthenticatePayload<'_>) -> HookResult {
let token = p.request.token.as_ref()
.ok_or("No token provided")?;
let user = validate_token(token).await
.map_err(|_| "Invalid token")?;
if !user_can_access(&user, p.doc_id) {
return Err("Access denied".into());
}
p.context.insert(user);
Ok(())
}
async fn on_change(&self, p: OnChangePayload<'_>) -> HookResult {
if let Some(user) = p.context.get::<User>() {
println!("{} edited {}", user.name, p.doc_id);
}
Ok(())
}
}
async fn validate_token(token: &str) -> Result<User, ()> {
Ok(User { id: "123".into(), name: "Alice".into() })
}
fn user_can_access(user: &User, doc_id: &str) -> bool {
true
}
#[tokio::main]
async fn main() {
Server::with_hooks(vec![Box::new(AuthHook {})])
.serve("0.0.0.0:8080")
.await
.unwrap();
}
Client-side:
const ws = new WebSocket('ws://localhost:8080/ws?token=your-jwt-token')
const ws = new WebSocket('ws://localhost:8080/ws', {
headers: { 'Authorization': 'Bearer your-jwt-token' }
})
Custom WebSocket Path
use siphonophore::Server;
#[tokio::main]
async fn main() {
let app = Server::new().into_router_at("/sync/websocket");
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Composing with Axum
use siphonophore::Server;
use axum::{Router, routing::get};
#[tokio::main]
async fn main() {
let server = Server::new();
let handle = server.handle();
let app = Router::new()
.merge(server.into_router_at("/collab")) .route("/health", get(|| async { "ok" }))
.route("/save/:doc", get(move |path: axum::extract::Path<String>| {
let h = handle.clone();
async move { h.persist_document(&path).await; "saved" }
}));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Wire Protocol
Messages are prefixed with doc_id for multiplexing:
[doc_id_len: u8][doc_id: bytes][yjs_payload: bytes]
Control Messages (Text WebSocket)
{"action": "leave", "doc": "my-document"}
{"action": "save", "doc": "my-document"}
Hooks
| Hook |
When |
Use Case |
on_connect |
Client tries to access doc |
Rate limiting, logging |
on_authenticate |
After connect |
Auth, set user context |
on_load_document |
Doc first loaded |
Load from storage |
on_change |
Every update |
Real-time webhooks |
on_disconnect |
Client leaves doc |
Analytics |
on_save |
Explicit save request |
Checkpoints |
before_close_dirty |
Before unloading dirty doc |
Lazypersistence |
after_unload_document |
Doc fully unloaded |
Cache invalidation |
Client Example (JavaScript)
import * as Y from 'yjs'
const doc = new Y.Doc()
const ws = new WebSocket('ws://localhost:8080/ws')
ws.binaryType = 'arraybuffer'
ws.onmessage = (event) => {
const data = new Uint8Array(event.data)
const docIdLen = data[0]
const docId = new TextDecoder().decode(data.slice(1, 1 + docIdLen))
const payload = data.slice(1 + docIdLen)
}
function send(docId, payload) {
const encoder = new Uint8Array(1 + docId.length + payload.length)
encoder[0] = docId.length
encoder.set(new TextEncoder().encode(docId), 1)
encoder.set(payload, 1 + docId.length)
ws.send(encoder)
}
License
MIT OR Apache-2.0