pjson-rs 0.4.1

Priority JSON Streaming Protocol - high-performance priority-based JSON streaming (requires nightly Rust)
Documentation

PJS - Priority JSON Streaming Protocol

Crates.io Documentation Rust Build codecov License Rust Version

🚀 6.3x faster than serde_json | ðŸŽŊ 5.3x faster progressive loading | ðŸ’ū Bounded memory usage | 🏗ïļ Production Ready

New in v0.3.0: Production-ready code quality with zero clippy warnings, Clean Architecture compliance, and comprehensive test coverage (196 tests). Now requires nightly Rust for zero-cost abstractions.

🌟 Key Features

⚡ Blazing Fast

  • 6.3x faster than serde_json
  • 1.71 GiB/s throughput
  • SIMD-accelerated parsing

ðŸŽŊ Smart Streaming

  • Skeleton-first delivery
  • Priority-based transmission
  • Progressive enhancement

ðŸ’ū Memory Efficient

  • 5.3x faster progressive loading
  • Bounded memory usage
  • Zero-copy operations

🔧 Production Ready

  • All tests passing
  • Clean Architecture

📊 Schema Aware

  • Automatic compression
  • Semantic analysis
  • Type optimization

🚀 Developer Friendly

  • Simple API
  • Drop-in replacement
  • Extensive documentation

ðŸŽŊ The Problem

Modern web applications face a fundamental challenge: large JSON responses block UI rendering.

Current State

  • 📊 Analytics dashboard loads 5MB of JSON
  • ⏱ïļ User waits 2-3 seconds seeing nothing
  • ðŸ˜Ī User thinks app is broken and refreshes
  • 🔄 The cycle repeats

Why existing solutions fall short

Solution Problem
Pagination Requires multiple round-trips, complex state management
GraphQL Still sends complete response, just smaller
JSON streaming No semantic understanding, can't prioritize
Compression Reduces size but not time-to-first-byte

âœĻ The Solution: PJS

PJS revolutionizes JSON transmission by understanding your data semantically and prioritizing what matters.

Core Innovation: Semantic Prioritization

#[derive(JsonPriority)]
struct UserDashboard {
    #[priority(critical)]  // Sent in first 10ms
    user_id: u64,
    user_name: String,
    
    #[priority(high)]      // Sent in next 50ms
    recent_activity: Vec<Activity>,
    notifications: Vec<Notification>,
    
    #[priority(low)]       // Sent when available
    detailed_analytics: Analytics,  // 4MB of data
}

Real-World Impact

Traditional JSON Loading:
[████████████████████] 100% - 2000ms - Full UI renders

PJS Loading:
[██░░░░░░░░░░░░░░░░░░] 10%  - 10ms   - Critical UI visible
[██████░░░░░░░░░░░░░░] 30%  - 50ms   - Interactive UI
[████████████████████] 100% - 2000ms - Full data loaded

User Experience: ⚡ Instant → 😊 Happy

Key Features

🚀 Complete HTTP Server Integration

Production-ready Axum integration with full REST API, session management, and real-time streaming.

ðŸŽŊ Advanced Streaming Implementations

  • AdaptiveFrameStream: Client capability-based optimization
  • BatchFrameStream: High-throughput batch processing
  • PriorityFrameStream: Priority-based frame ordering with buffering

🏗ïļ Domain-Driven Design Architecture

Clean architecture with CQRS pattern, event sourcing, and ports & adapters for maximum testability and maintainability.

📊 Production-Ready Infrastructure

  • Thread-safe in-memory storage and metrics collection
  • Event publishing with subscription support
  • Prometheus metrics integration
  • Comprehensive middleware stack (CORS, security, compression)

🔄 Multiple Response Formats

Automatic format detection supporting JSON, NDJSON, and Server-Sent Events based on client Accept headers.

⚡ SIMD-Accelerated Parsing

Powered by sonic-rs for blazing fast JSON processing with zero-copy operations.

🔄 Real-Time WebSocket Streaming

Complete WebSocket implementation with priority-based frame delivery:

  • Session Management: Track active WebSocket connections with metrics
  • Priority-Based Delivery: Critical data sent first with adaptive delays
  • Schema-Based Compression: Intelligent compression using multiple strategies
  • Progressive Enhancement: Skeleton-first streaming with incremental updates
  • Demo Servers: Interactive demonstrations of real-time streaming capabilities

🎉 What's New in v0.3.0

🛠ïļ Production-Ready Code Quality

  • Zero Clippy Warnings: All 44+ clippy warnings resolved across entire codebase
  • Modern Format Strings: Updated to format!("{var}") syntax throughout
  • Enhanced Error Handling: Proper Result patterns and async trait compatibility
  • Memory Safety: Fixed await-holding lock patterns and buffer alignment issues
  • 196 Tests Passing: Complete test suite with all features enabled

🏗ïļ Clean Architecture Enforcement

  • Domain Layer Isolation: Custom JsonData value object replacing serde_json::Value
  • Type Safety: Eliminated all architecture violations in domain layer
  • Seamless Conversion: From trait implementations for JsonData ↔ serde_json::Value
  • Proper Boundaries: Clear separation between domain and infrastructure errors

🌐 HTTP/WebSocket Modernization

  • Axum v0.8 Compatibility: Updated route syntax from :param to {param} format
  • StreamExt Integration: Fixed async stream processing with proper trait imports
  • Body Type Updates: Modern HTTP body handling for latest axum/hyper versions
  • All Tests Passing: Complete HTTP integration test suite validation

🔧 Technical Debt Resolution

  • Architecture Compliance: Resolved all Clean Architecture violations
  • Lint Standards: Zero warnings with strict linting enabled (-D warnings)
  • Async Patterns: Fixed await-across-locks and other async safety issues
  • Type System: Enhanced type safety with better generic bounds and aliases

Benchmarks

🚀 Actual Performance Results

Metric serde_json sonic-rs PJS PJS Advantage
Small JSON (43B) 275ns 129ns 312ns Competitive
Medium JSON (351B) 1,662ns 434ns 590ns 2.8x vs serde
Large JSON (357KB) 1,294Ξs 216Ξs 204Ξs 6.3x vs serde, 1.06x vs sonic
Memory Efficiency Baseline Fast 5.3x faster progressive Bounded memory
Progressive Loading Batch-only Batch-only 37Ξs vs 198Ξs 5.3x faster

ðŸŽŊ Key Performance Achievements

  • 6.3x faster than serde_json for large JSON processing
  • 1.06x faster than sonic-rs (SIMD library) on large datasets
  • 5.3x faster progressive loading vs traditional batch processing
  • 1.71 GiB/s sustained throughput (exceeding sonic-rs 1.61 GiB/s)

Installation

Add PJS to your Cargo.toml:

[dependencies]
pjson-rs = "0.3.0"

# Optional: for HTTP server integration
axum = "0.8"
tokio = { version = "1", features = ["full"] }

Or use cargo:

cargo add pjson-rs

Quick Start

HTTP Server with Axum Integration

use std::sync::Arc;
use pjson_rs::{
    application::{
        handlers::{InMemoryCommandHandler, InMemoryQueryHandler},
        services::{SessionService, StreamingService},
    },
    infrastructure::{
        adapters::{InMemoryStreamRepository, InMemoryEventPublisher, InMemoryMetricsCollector},
        http::axum_adapter::{create_pjs_router, PjsAppState},
    },
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Create infrastructure
    let repository = Arc::new(InMemoryStreamRepository::new());
    let event_publisher = Arc::new(InMemoryEventPublisher::new());
    let metrics_collector = Arc::new(InMemoryMetricsCollector::new());
    
    // Create CQRS handlers
    let command_handler = Arc::new(InMemoryCommandHandler::new(
        repository.clone(), event_publisher, metrics_collector.clone()
    ));
    let query_handler = Arc::new(InMemoryQueryHandler::new(repository, metrics_collector));
    
    // Create services
    let session_service = Arc::new(SessionService::new(command_handler.clone(), query_handler.clone()));
    let streaming_service = Arc::new(StreamingService::new(command_handler));
    
    // Build Axum app
    let app = create_pjs_router()
        .with_state(PjsAppState::new(session_service, streaming_service));
    
    // Start server
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    println!("🚀 PJS Server running on http://127.0.0.1:3000");
    axum::serve(listener, app).await?;
    
    Ok(())
}

JavaScript/TypeScript Client Library

Use the official PJS client library for seamless integration:

npm install @pjs/client

HTTP Streaming

import { PjsClient, createHttpTransport } from '@pjs/client';

// Create client with HTTP transport
const client = new PjsClient({
    transport: createHttpTransport({
        baseUrl: 'http://localhost:3000',
        format: 'sse' // or 'json', 'ndjson'
    })
});

// Stream data with priority-based delivery
await client.stream({
    data: { 
        users: [/* large array */],
        dashboard: { /* complex object */ }
    },
    onFrame: (frame) => {
        // Frames arrive in priority order
        if (frame.priority >= 90) {
            updateUI(frame.data); // Critical data first
        }
    }
});

WebSocket Real-Time Streaming

import { PjsClient, createWebSocketTransport } from '@pjs/client';

const client = new PjsClient({
    transport: createWebSocketTransport({
        url: 'ws://localhost:3001/ws'
    })
});

// Real-time streaming with priority handling
await client.connect();

client.onFrame((frame) => {
    console.log(`Priority ${frame.priority}:`, frame.data);
    
    // Handle based on priority
    switch (frame.priority) {
        case 'critical':
            showImmediate(frame.data);
            break;
        case 'high':
            queueForNextFrame(frame.data);
            break;
        default:
            processInBackground(frame.data);
    }
});

WebSocket Streaming

use pjson_rs::{
    ApplicationResult,
    domain::value_objects::SessionId,
};
use futures::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};

#[tokio::main]
async fn main() -> ApplicationResult<()> {
    // Connect to WebSocket streaming server
    let (ws_stream, _) = connect_async("ws://127.0.0.1:3001/ws")
        .await
        .expect("Failed to connect");
    
    let (mut write, mut read) = ws_stream.split();
    
    // Receive prioritized frames
    while let Some(message) = read.next().await {
        match message? {
            Message::Text(text) => {
                let frame: serde_json::Value = serde_json::from_str(&text)?;
                
                match frame["@type"].as_str() {
                    Some("pjs_frame") => {
                        let priority = frame["@priority"].as_u64().unwrap_or(0);
                        
                        if priority >= 200 {
                            println!("ðŸšĻ Critical data: {}", frame["data"]);
                        } else if priority >= 100 {
                            println!("📊 High priority: {}", frame["data"]);
                        } else {
                            println!("📝 Background data received");
                        }
                    }
                    Some("stream_complete") => {
                        println!("✅ Stream completed!");
                        break;
                    }
                    _ => {}
                }
            }
            _ => {}
        }
    }
    
    Ok(())
}

Demo Servers

Start the interactive demos to see PJS in action:

# WebSocket streaming server with priority-based delivery
cargo run --bin websocket_streaming --manifest-path crates/pjs-demo/Cargo.toml

# Interactive demo with HTML interface and real-time visualization
cargo run --bin interactive_demo --manifest-path crates/pjs-demo/Cargo.toml

# Simple demo server with basic streaming
cargo run --bin simple_demo --manifest-path crates/pjs-demo/Cargo.toml

# Performance comparison demo (PJS vs traditional JSON)
cargo run --bin performance_comparison --manifest-path crates/pjs-demo/Cargo.toml

Or run root-level examples:

# Complete Axum HTTP server
cargo run --example axum_server

# Advanced streaming demo server  
cargo run --example streaming_demo_server

# Simple usage patterns
cargo run --example simple_usage

Then visit http://127.0.0.1:3000 to see priority-based streaming in action.

Use Cases

Perfect for:

  • 📊 Real-time dashboards - Show key metrics instantly
  • ðŸ“ą Mobile apps - Optimize for slow networks
  • 🛍ïļ E-commerce - Load product essentials first
  • 📈 Financial platforms - Prioritize critical trading data
  • ðŸŽŪ Gaming leaderboards - Show player's rank immediately

Architecture

PJS implements a clean, layered architecture following Domain-Driven Design principles:

1. Domain Layer

Core business logic with value objects (Priority, SessionId, JsonPath) and aggregates (StreamSession) ensuring data consistency.

2. Application Layer

CQRS pattern with separate Command and Query handlers, plus high-level services (SessionService, StreamingService) orchestrating workflows.

3. Infrastructure Layer

Adapters implementing domain ports:

  • Storage: In-memory repositories with thread-safe concurrent access
  • Events: Publisher/subscriber pattern for domain event distribution
  • Metrics: Performance monitoring with Prometheus integration
  • HTTP: Complete Axum server with middleware stack

4. Transport Abstraction

Multi-format streaming support:

  • JSON: Standard response format
  • NDJSON: Newline-delimited for efficient processing
  • Server-Sent Events: Real-time browser compatibility
  • Automatic format detection via Accept headers

5. Advanced Streaming

Intelligent frame processing:

  • Priority-based delivery: Critical data first
  • Adaptive buffering: Dynamic sizing based on client performance
  • Batch processing: High-throughput chunk aggregation

Technical Architecture

pjs/
├── crates/
│   ├── pjs-core/           # Core protocol, domain logic, and HTTP integration
│   │   ├── src/
│   │   │   ├── application/    # CQRS handlers, services, DTOs
│   │   │   ├── domain/         # Value objects, entities, aggregates
│   │   │   ├── infrastructure/ # HTTP, WebSocket, repositories, adapters
│   │   │   ├── parser/         # SIMD, zero-copy, buffer pools
│   │   │   ├── stream/         # Priority streaming, reconstruction
│   │   │   └── compression/    # Schema-based compression
│   │   ├── examples/           # Standalone demos (zero-copy, compression)
│   │   └── tests/              # Integration tests
│   ├── pjs-demo/           # Interactive demo servers with WebSocket streaming
│   │   └── src/
│   │       ├── servers/        # Demo server implementations
│   │       ├── clients/        # WebSocket client demos  
│   │       ├── data/           # Sample data generators (analytics, ecommerce)
│   │       └── static/         # HTML interfaces
│   ├── pjs-js-client/      # JavaScript/TypeScript client library ✅ IMPLEMENTED
│   │   ├── src/            # TypeScript source code with transport layers
│   │   ├── tests/          # Jest test suite with full coverage
│   │   └── package.json    # NPM configuration and dependencies
│   └── pjs-bench/          # Benchmarking suite
│       └── benches/        # Criterion.rs performance benchmarks
└── examples/               # Root-level usage examples
    ├── axum_server.rs      # Complete HTTP server demo
    ├── simple_usage.rs     # Basic usage patterns  
    └── streaming_demo_server.rs # Advanced streaming demo

Current Implementation Status

  • Phase 1: ✅ Core foundation (100% complete)
  • Phase 2: ✅ Protocol layer (100% complete)
  • Phase 3: ✅ Client/Server framework (100% complete)
  • Phase 4: ✅ Transport layer (100% complete)
  • Phase 5: ✅ Production features (100% complete)
  • Phase 6: ✅ Real-Time Streaming (100% complete)
  • Phase 7: ✅ JavaScript/TypeScript Client SDK (100% complete)
  • Phase 8: ✅ Code Quality & Production Readiness (100% complete)
  • Overall: ~98% of core functionality implemented

Implemented Components

  • ✅ pjs-core: Complete Rust implementation with Clean Architecture
  • ✅ pjs-demo: Interactive demo servers with real-time WebSocket streaming
  • ✅ pjs-js-client: Full TypeScript/JavaScript client library with transport layers
  • ✅ pjs-bench: Comprehensive benchmarking suite with performance validation
  • ✅ Examples: Multiple working examples from simple to advanced usage

API Examples

HTTP Endpoints

The server provides a complete REST API:

# Create a new session
POST /pjs/sessions
Content-Type: application/json
{
  "max_concurrent_streams": 10,
  "timeout_seconds": 3600,
  "client_info": "My App v1.0"
}

# Response: { "session_id": "sess_abc123", "expires_at": "..." }

# Get session info  
GET /pjs/sessions/{session_id}

# Start streaming data
POST /pjs/stream/{session_id}
Content-Type: application/json
{
  "data": { "users": [...], "products": [...] },
  "priority_threshold": 50,
  "max_frames": 100
}

# Stream frames (JSON format)
GET /pjs/stream/{session_id}/frames?format=json&priority=80

# Real-time Server-Sent Events
GET /pjs/stream/{session_id}/sse
Accept: text/event-stream

# System health check
GET /pjs/health
# Response: { "status": "healthy", "version": "0.3.0" }

Working Example

A complete working server is available at examples/axum_server.rs. To run it:

# Start the server
cargo run --example axum_server

# Test endpoints
curl -X POST http://localhost:3000/pjs/sessions \
  -H "Content-Type: application/json" \
  -d '{"max_concurrent_streams": 5}'

# Check health  
curl http://localhost:3000/pjs/health

# View metrics
curl http://localhost:3000/examples/metrics

Performance Goals

  • Throughput: >4 GB/s with sonic-rs
  • Time to First Byte: <10ms for critical data
  • Memory Efficiency: 5-10x reduction vs traditional parsing
  • CPU Utilization: Full SIMD acceleration

Building

Prerequisites

  • Rust nightly (required for impl Trait in associated types)
  • CPU with AVX2 support (recommended for SIMD acceleration)

Setting up Rust Nightly

# Install nightly Rust
rustup install nightly

# Set nightly for this project
rustup override set nightly

# Or use nightly globally
rustup default nightly

Quick Start

# Clone repository
git clone https://github.com/bug-ops/pjs
cd pjs

# Ensure nightly Rust is active
rustup override set nightly

# Build with optimizations
cargo build --release

# Run tests
cargo test --workspace

# Run the complete HTTP server example
cargo run --example axum_server

# Build with optional features
cargo build --features "http-client,prometheus-metrics"

Feature Flags

  • http-client: Enable HTTP-based event publishing
  • prometheus-metrics: Enable Prometheus metrics collection
  • simd-auto: Auto-detect best SIMD support (default)
  • compression: Enable compression middleware

Framework Integration

Universal Integration Layer with Zero-Cost Abstractions

PJS provides true zero-cost abstractions using nightly Rust features for maximum performance. The Universal Framework Integration Layer uses Generic Associated Types (GATs) with impl Trait to eliminate all runtime overhead:

use pjson_rs::infrastructure::integration::StreamingAdapter;
use std::future::Future;

// Zero-cost framework integration with GATs
impl StreamingAdapter for YourFramework {
    type Request = YourRequest;
    type Response = YourResponse;
    type Error = YourError;

    // TRUE zero-cost futures - no Box allocation!
    type StreamingResponseFuture<'a> = impl Future<Output = IntegrationResult<Self::Response>> + Send + 'a
    where
        Self: 'a;

    fn create_streaming_response<'a>(
        &'a self,
        session_id: SessionId,
        frames: Vec<StreamFrame>,
        format: StreamingFormat,
    ) -> Self::StreamingResponseFuture<'a> {
        // Direct async block - compiler generates optimal Future type
        async move {
            // Your framework-specific logic here
            Ok(your_response)
        }
    }

    fn framework_name(&self) -> &'static str {
        "your_framework"
    }
}

Performance Benefits of Nightly Rust

Zero-Cost Abstractions:

  • 1.82x faster trait dispatch vs async_trait
  • Zero heap allocations for futures
  • Pure stack allocation - no runtime overhead
  • Static dispatch eliminates vtables
  • Complete inlining for hot paths

Currently Supported

  • ✅ Axum: Full native integration with zero-cost GAT futures
  • 🔧 Any Framework: Universal adapter with true zero-cost abstractions
  • 📋 Planned: Helper macros for popular frameworks (Actix, Warp, Tide)

Integration Examples

// Axum (native support)
use pjson_rs::infrastructure::http::axum_adapter::create_pjs_router;
let app = create_pjs_router().with_state(app_state);

// Custom framework integration
use pjson_rs::infrastructure::adapters::UniversalAdapter;
let adapter = UniversalAdapter::new()
    .with_serializer(your_serializer)
    .with_transport(your_transport);

Production Features

Middleware Stack

The HTTP server includes production-ready middleware:

use pjson_rs::infrastructure::http::middleware::*;

let app = create_pjs_router()
    .layer(axum::middleware::from_fn(pjs_cors_middleware))
    .layer(axum::middleware::from_fn(security_middleware))
    .layer(axum::middleware::from_fn(health_check_middleware))
    .layer(PjsMiddleware::new()
        .with_compression(true)
        .with_metrics(true)
        .with_max_request_size(10 * 1024 * 1024))
    .with_state(app_state);

Monitoring & Metrics

Built-in Prometheus metrics support:

// Automatically tracks:
// - pjs_active_sessions
// - pjs_total_sessions_created  
// - pjs_frames_processed_total
// - pjs_bytes_streamed_total
// - pjs_frame_processing_time_ms

let metrics = collector.export_prometheus();
// Expose at /metrics endpoint for Prometheus scraping

Event System

Comprehensive domain event tracking:

// Events automatically generated:
// - SessionCreated, SessionActivated, SessionEnded
// - StreamStarted, StreamCompleted, FrameGenerated
// - PriorityAdjusted, ErrorOccurred

publisher.subscribe("SessionCreated", |event| {
    println!("New session: {}", event.session_id());
});

Contributing

We welcome contributions! See CONTRIBUTING.md for guidelines.

Development Setup

# Install development tools
rustup component add clippy rustfmt

# Run checks
cargo clippy --workspace
cargo fmt --check

# Run all tests
cargo test --workspace --all-features

License

Licensed under either of:

at your option.

Getting Started Right Now

Want to try PJS immediately? Here's the fastest way:

# Clone and run
git clone https://github.com/bug-ops/pjs
cd pjs

# Set nightly Rust (required)
rustup override set nightly

# Run the server
cargo run --example axum_server

# In another terminal, test the API
curl -X POST http://localhost:3000/pjs/sessions \
  -H "Content-Type: application/json" \
  -d '{"max_concurrent_streams": 5}'

# Try Server-Sent Events streaming  
curl -N -H "Accept: text/event-stream" \
  http://localhost:3000/pjs/stream/{session_id}/sse

Running Performance Benchmarks

To verify the performance claims, run the comprehensive benchmark suite:

# Run all benchmarks
cargo bench -p pjs-bench

# Or run specific benchmarks:
cargo bench -p pjs-bench --bench simple_throughput    # Core parsing speed
cargo bench -p pjs-bench --bench memory_benchmarks    # Memory efficiency  
cargo bench -p pjs-bench --bench streaming_benchmarks # Progressive loading

Results show PJS 6.3x faster than serde_json and 1.06x faster than sonic-rs on large JSON.

The server will show:

  • 🚀 Server starting message
  • 📊 Health check endpoint
  • 📝 Available API endpoints
  • ðŸŽŊ Demo data streaming capabilities

Roadmap

Next Steps

  • Connection lifecycle management ✅
  • WebSocket real-time streaming ✅
  • Performance benchmarks vs alternatives ✅
  • JavaScript/TypeScript client library ✅
  • Universal framework integration layer
  • Schema validation engine
  • Custom priority strategies

Acknowledgments

Built with:

  • sonic-rs - Lightning fast SIMD JSON parser
  • axum - Ergonomic web framework for Rust
  • tokio - Async runtime for Rust
  • bytes - Efficient byte buffer management

Community


PJS: Because users shouldn't wait for data they don't need yet.