use anyhow::Result;
use oxirs_stream::{
EdgeLocation, OptimizationLevel, ProcessingSpecialization, StreamEvent, WasmEdgeConfig,
WasmEdgeProcessor, WasmPlugin, WasmResourceLimits,
};
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
info!("Starting Edge Computing with WASM Example");
basic_edge_processing_example().await?;
multi_region_edge_example().await?;
hot_swap_plugin_example().await?;
specialized_processing_example().await?;
Ok(())
}
async fn basic_edge_processing_example() -> Result<()> {
info!("=== Basic Edge Processing Example ===");
let config = WasmEdgeConfig {
optimization_level: OptimizationLevel::Release,
resource_limits: WasmResourceLimits {
max_memory_bytes: 100 * 1024 * 1024, max_execution_time_ms: 1000, max_stack_size_bytes: 1024 * 1024, max_table_elements: 10000,
enable_simd: true,
enable_threads: false,
..Default::default()
},
enable_caching: true,
enable_jit: true,
security_sandbox: true,
allowed_imports: vec!["env".to_string(), "wasi_snapshot_preview1".to_string()],
..Default::default()
};
let mut processor = WasmEdgeProcessor::new(config)?;
let plugin_wasm = create_simple_filter_plugin()?;
let plugin = WasmPlugin {
id: "filter-plugin-v1".to_string(),
name: "Simple Event Filter".to_string(),
version: "1.0.0".to_string(),
description: "Basic event filtering plugin".to_string(),
author: "OxiRS Team".to_string(),
capabilities: vec![],
wasm_bytes: plugin_wasm,
schema: oxirs_stream::PluginSchema::default(),
performance_profile: oxirs_stream::PerformanceProfile::default(),
security_level: oxirs_stream::SecurityLevel::Standard,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
processor.load_plugin(plugin).await?;
info!("WASM plugin loaded successfully");
for i in 0..50 {
let event = create_test_event(i);
let result = processor.process(event).await?;
if let Some(_processed) = result.output {
info!(
"Edge processed event {}: latency={:.2}ms",
i, result.latency_ms
);
}
if i % 10 == 0 {
let stats = processor.get_stats().await;
info!(
"Stats - processed: {}, avg latency: {:.2}ms",
stats.total_processed, stats.average_latency_ms
);
}
}
Ok(())
}
async fn multi_region_edge_example() -> Result<()> {
info!("=== Multi-Region Edge Deployment Example ===");
let locations = vec![
EdgeLocation {
id: "us-west".to_string(),
region: "us-west-1".to_string(),
latency_ms: 5.0,
capacity_factor: 1.0,
available_resources: oxirs_stream::ResourceMetrics {
cpu_cores: 2,
memory_mb: 512,
storage_gb: 100,
network_mbps: 1000.0,
gpu_units: 0,
quantum_qubits: 0,
},
specializations: vec![
ProcessingSpecialization::RdfProcessing,
ProcessingSpecialization::SparqlOptimization,
],
},
EdgeLocation {
id: "eu-central".to_string(),
region: "eu-central-1".to_string(),
latency_ms: 8.0,
capacity_factor: 0.9,
available_resources: oxirs_stream::ResourceMetrics {
cpu_cores: 2,
memory_mb: 512,
storage_gb: 100,
network_mbps: 1000.0,
gpu_units: 0,
quantum_qubits: 0,
},
specializations: vec![ProcessingSpecialization::GraphAnalytics],
},
EdgeLocation {
id: "ap-southeast".to_string(),
region: "ap-southeast-1".to_string(),
latency_ms: 12.0,
capacity_factor: 0.8,
available_resources: oxirs_stream::ResourceMetrics {
cpu_cores: 1,
memory_mb: 256,
storage_gb: 50,
network_mbps: 500.0,
gpu_units: 0,
quantum_qubits: 0,
},
specializations: vec![ProcessingSpecialization::MachineLearning],
},
];
let config = WasmEdgeConfig {
optimization_level: OptimizationLevel::Adaptive,
resource_limits: WasmResourceLimits {
max_memory_bytes: 200 * 1024 * 1024,
max_execution_time_ms: 500,
max_stack_size_bytes: 512 * 1024,
max_table_elements: 5000,
enable_simd: true,
enable_threads: false,
..Default::default()
},
enable_caching: true,
enable_jit: true,
security_sandbox: true,
allowed_imports: vec!["env".to_string()],
..Default::default()
};
let processor = WasmEdgeProcessor::new(config)?;
let client_locations = [
(37.7749, -122.4194), (51.5074, -0.1278), (1.3521, 103.8198), ];
for (i, (lat, lon)) in client_locations.iter().enumerate() {
let optimal_location = select_optimal_edge_location(&locations, *lat, *lon);
info!(
"Client {} -> Routing to edge: {} (latency: {:.2}ms)",
i, optimal_location.id, optimal_location.latency_ms
);
let event = create_test_event(i as u64);
let result = processor
.process_at_location(event, optimal_location)
.await?;
info!(
"Processed at {} edge: total latency={:.2}ms",
optimal_location.id, result.latency_ms
);
}
Ok(())
}
async fn hot_swap_plugin_example() -> Result<()> {
info!("=== Hot-Swap Plugin Example ===");
let config = WasmEdgeConfig {
optimization_level: OptimizationLevel::Release,
resource_limits: WasmResourceLimits {
max_memory_bytes: 100 * 1024 * 1024,
max_execution_time_ms: 1000,
max_stack_size_bytes: 1024 * 1024,
max_table_elements: 10000,
enable_simd: true,
enable_threads: false,
..Default::default()
},
enable_caching: true,
enable_jit: true,
security_sandbox: true,
allowed_imports: vec!["env".to_string()],
..Default::default()
};
let mut processor = WasmEdgeProcessor::new(config)?;
info!("Loading plugin version 1.0...");
let plugin_v1 = WasmPlugin {
id: "processor-v1".to_string(),
name: "Event Processor".to_string(),
version: "1.0.0".to_string(),
description: "Simple event processor v1".to_string(),
author: "OxiRS Team".to_string(),
capabilities: vec![],
wasm_bytes: create_simple_filter_plugin()?,
schema: oxirs_stream::PluginSchema::default(),
performance_profile: oxirs_stream::PerformanceProfile::default(),
security_level: oxirs_stream::SecurityLevel::Standard,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
processor.load_plugin(plugin_v1).await?;
for i in 0..5 {
let event = create_test_event(i);
processor.process(event).await?;
}
info!("Processed 5 events with plugin v1");
info!("Hot-swapping to plugin version 2.0...");
let plugin_v2 = WasmPlugin {
id: "processor-v2".to_string(),
name: "Event Processor".to_string(),
version: "2.0.0".to_string(),
description: "Enhanced event processor v2".to_string(),
author: "OxiRS Team".to_string(),
capabilities: vec![],
wasm_bytes: create_enhanced_filter_plugin()?,
schema: oxirs_stream::PluginSchema::default(),
performance_profile: oxirs_stream::PerformanceProfile::default(),
security_level: oxirs_stream::SecurityLevel::Standard,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
processor.hot_swap_plugin("processor-v1", plugin_v2).await?;
info!("Plugin hot-swapped successfully!");
for i in 5..10 {
let event = create_test_event(i);
processor.process(event).await?;
}
info!("Processed 5 events with plugin v2 (zero downtime)");
Ok(())
}
async fn specialized_processing_example() -> Result<()> {
info!("=== Specialized Processing Example ===");
let rdf_config = WasmEdgeConfig {
optimization_level: OptimizationLevel::Maximum,
resource_limits: WasmResourceLimits {
max_memory_bytes: 500 * 1024 * 1024, max_execution_time_ms: 2000,
max_stack_size_bytes: 2 * 1024 * 1024,
max_table_elements: 50000,
enable_simd: true,
enable_threads: false,
..Default::default()
},
enable_caching: true,
enable_jit: true,
security_sandbox: true,
allowed_imports: vec!["env".to_string()],
..Default::default()
};
let _rdf_processor = WasmEdgeProcessor::new(rdf_config)?;
let _rdf_plugin = WasmPlugin {
id: "rdf-processor".to_string(),
name: "RDF Stream Processor".to_string(),
version: "1.0.0".to_string(),
description: "RDF and semantic graph processor".to_string(),
author: "OxiRS Team".to_string(),
capabilities: vec![],
wasm_bytes: vec![], schema: oxirs_stream::PluginSchema::default(),
performance_profile: oxirs_stream::PerformanceProfile::default(),
security_level: oxirs_stream::SecurityLevel::High,
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
};
info!("RDF-specialized edge processor configured");
info!("Ready for semantic graph processing at the edge");
Ok(())
}
fn create_test_event(id: u64) -> StreamEvent {
use oxirs_stream::EventMetadata;
StreamEvent::TripleAdded {
subject: format!("http://example.org/subject{}", id),
predicate: "http://example.org/predicate".to_string(),
object: format!("value{}", id),
graph: None,
metadata: EventMetadata {
event_id: format!("event-{}", id),
timestamp: chrono::Utc::now(),
source: "edge-generator".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: std::collections::HashMap::new(),
checksum: None,
},
}
}
fn create_simple_filter_plugin() -> Result<Vec<u8>> {
Ok(vec![])
}
fn create_enhanced_filter_plugin() -> Result<Vec<u8>> {
Ok(vec![])
}
fn select_optimal_edge_location(
locations: &[EdgeLocation],
_client_lat: f64,
_client_lon: f64,
) -> &EdgeLocation {
locations
.iter()
.min_by(|a, b| a.latency_ms.partial_cmp(&b.latency_ms).unwrap())
.unwrap()
}