1use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::ffi::CStr;
10use std::os::raw::c_char;
11use std::sync::Arc;
12use std::sync::OnceLock;
13#[cfg(not(target_arch = "wasm32"))]
14use tokio::sync::RwLock;
15
16use crate::ipc_types::{
17 CommandDescriptor, ExtensionDescriptor, ExtensionError, ExtensionMetadata,
18 ExtensionMetricValue, ExtensionStats, MetricDescriptor, PushOutputMessage, Result,
19};
20
21macro_rules! define_capabilities {
26 ($($variant:ident => $const_name:ident => $name:literal => $doc:literal),* $(,)?) => {
27 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
29 pub enum ExtensionCapability {
30 $(
31 #[doc = $doc]
32 #[serde(rename = $name)]
33 $variant,
34 )*
35 #[serde(rename = "custom")]
36 Custom(String),
37 }
38
39 impl ExtensionCapability {
40 pub fn is_custom(&self) -> bool {
41 matches!(self, ExtensionCapability::Custom(_))
42 }
43
44 pub fn name(&self) -> String {
45 match self {
46 $(ExtensionCapability::$variant => $name.to_string(),)*
47 ExtensionCapability::Custom(name) => name.clone(),
48 }
49 }
50
51 pub fn all_capabilities() -> Vec<Self> {
52 vec![$(ExtensionCapability::$variant,)*]
53 }
54
55 pub fn from_name(name: &str) -> Option<Self> {
56 match name {
57 $($name => Some(ExtensionCapability::$variant),)*
58 _ => Some(ExtensionCapability::Custom(name.to_string())),
59 }
60 }
61 }
62
63 pub mod capabilities {
65 $(pub const $const_name: &str = $name;)*
66 }
67 };
68}
69
70define_capabilities! {
71 DeviceMetricsRead => DEVICE_METRICS_READ => "device_metrics_read" => "Access to device metrics (read current state)",
72 DeviceMetricsWrite => DEVICE_METRICS_WRITE => "device_metrics_write" => "Access to write device metrics (including virtual metrics)",
73 DeviceControl => DEVICE_CONTROL => "device_control" => "Access to control devices (send commands)",
74 StorageQuery => STORAGE_QUERY => "storage_query" => "Access to storage queries (read telemetry)",
75 EventPublish => EVENT_PUBLISH => "event_publish" => "Access to publish events",
76 EventSubscribe => EVENT_SUBSCRIBE => "event_subscribe" => "Access to subscribe to events",
77 TelemetryHistory => TELEMETRY_HISTORY => "telemetry_history" => "Access to query device telemetry history",
78 MetricsAggregate => METRICS_AGGREGATE => "metrics_aggregate" => "Access to aggregate device metrics",
79 ExtensionCall => EXTENSION_CALL => "extension_call" => "Access to call other extensions",
80 AgentTrigger => AGENT_TRIGGER => "agent_trigger" => "Access to trigger agents",
81 RuleTrigger => RULE_TRIGGER => "rule_trigger" => "Access to trigger rules",
82 DeviceTemplateRegister => DEVICE_TEMPLATE_REGISTER => "device_template_register" => "Register device type templates",
83 DeviceRegister => DEVICE_REGISTER => "device_register" => "Register device instances",
84 DeviceUnregister => DEVICE_UNREGISTER => "device_unregister" => "Unregister device instances",
85}
86
87impl ExtensionCapability {
88 pub fn display_name(&self) -> String {
89 match self {
90 ExtensionCapability::DeviceMetricsRead => "Device Metrics Read".to_string(),
91 ExtensionCapability::DeviceMetricsWrite => "Device Metrics Write".to_string(),
92 ExtensionCapability::DeviceControl => "Device Control".to_string(),
93 ExtensionCapability::StorageQuery => "Storage Query".to_string(),
94 ExtensionCapability::EventPublish => "Event Publish".to_string(),
95 ExtensionCapability::EventSubscribe => "Event Subscribe".to_string(),
96 ExtensionCapability::TelemetryHistory => "Telemetry History".to_string(),
97 ExtensionCapability::MetricsAggregate => "Metrics Aggregate".to_string(),
98 ExtensionCapability::ExtensionCall => "Extension Call".to_string(),
99 ExtensionCapability::AgentTrigger => "Agent Trigger".to_string(),
100 ExtensionCapability::RuleTrigger => "Rule Trigger".to_string(),
101 ExtensionCapability::DeviceTemplateRegister => "Device Template Register".to_string(),
102 ExtensionCapability::DeviceRegister => "Device Register".to_string(),
103 ExtensionCapability::DeviceUnregister => "Device Unregister".to_string(),
104 ExtensionCapability::Custom(name) => format!("Custom: {}", name),
105 }
106 }
107
108 pub fn description(&self) -> String {
109 match self {
110 ExtensionCapability::DeviceMetricsRead => {
111 "Read current device metrics and state".to_string()
112 }
113 ExtensionCapability::DeviceMetricsWrite => {
114 "Write device metrics including virtual metrics".to_string()
115 }
116 ExtensionCapability::DeviceControl => "Send commands to control devices".to_string(),
117 ExtensionCapability::StorageQuery => "Query stored telemetry data".to_string(),
118 ExtensionCapability::EventPublish => "Publish events".to_string(),
119 ExtensionCapability::EventSubscribe => "Subscribe to events".to_string(),
120 ExtensionCapability::TelemetryHistory => {
121 "Query device telemetry history data".to_string()
122 }
123 ExtensionCapability::MetricsAggregate => {
124 "Aggregate and calculate device metrics".to_string()
125 }
126 ExtensionCapability::ExtensionCall => "Call other extensions".to_string(),
127 ExtensionCapability::AgentTrigger => "Trigger AI agent execution".to_string(),
128 ExtensionCapability::RuleTrigger => "Trigger rule engine execution".to_string(),
129 ExtensionCapability::DeviceTemplateRegister => {
130 "Register device type templates".to_string()
131 }
132 ExtensionCapability::DeviceRegister => "Register device instances".to_string(),
133 ExtensionCapability::DeviceUnregister => "Unregister device instances".to_string(),
134 ExtensionCapability::Custom(_) => "Custom capability".to_string(),
135 }
136 }
137
138 pub fn category(&self) -> String {
139 match self {
140 ExtensionCapability::DeviceMetricsRead
141 | ExtensionCapability::DeviceMetricsWrite
142 | ExtensionCapability::DeviceControl
143 | ExtensionCapability::DeviceTemplateRegister
144 | ExtensionCapability::DeviceRegister
145 | ExtensionCapability::DeviceUnregister => "device".to_string(),
146 ExtensionCapability::StorageQuery => "storage".to_string(),
147 ExtensionCapability::EventPublish | ExtensionCapability::EventSubscribe => {
148 "event".to_string()
149 }
150 ExtensionCapability::TelemetryHistory | ExtensionCapability::MetricsAggregate => {
151 "telemetry".to_string()
152 }
153 ExtensionCapability::ExtensionCall => "extension".to_string(),
154 ExtensionCapability::AgentTrigger => "agent".to_string(),
155 ExtensionCapability::RuleTrigger => "rule".to_string(),
156 ExtensionCapability::Custom(_) => "custom".to_string(),
157 }
158 }
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct CapabilityManifest {
164 pub capabilities: Vec<ExtensionCapability>,
165 pub api_version: String,
166 pub min_core_version: String,
167 pub package_name: String,
168}
169
170#[derive(Debug, thiserror::Error)]
176pub enum CapabilityError {
177 #[error("Capability not available: {0:?}")]
178 NotAvailable(ExtensionCapability),
179 #[error("Provider error: {0}")]
180 ProviderError(String),
181 #[error("Invalid parameters: {0}")]
182 InvalidParameters(String),
183 #[error("Provider not found for capability: {0:?}")]
184 ProviderNotFound(ExtensionCapability),
185}
186
187#[async_trait]
189pub trait ExtensionCapabilityProvider: Send + Sync {
190 fn capability_manifest(&self) -> CapabilityManifest;
191
192 async fn invoke_capability(
193 &self,
194 capability: ExtensionCapability,
195 params: &serde_json::Value,
196 ) -> std::result::Result<serde_json::Value, CapabilityError>;
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct ExtensionContextConfig {
206 #[serde(default)]
207 pub api_base_url: String,
208 pub api_version: String,
209 pub extension_id: String,
210 #[serde(default)]
211 pub rate_limit: Option<usize>,
212}
213
214impl Default for ExtensionContextConfig {
215 fn default() -> Self {
216 Self {
217 api_base_url: String::new(),
218 api_version: "v1".to_string(),
219 extension_id: String::new(),
220 rate_limit: None,
221 }
222 }
223}
224
225#[derive(Debug, Clone, Default)]
227pub struct AvailableCapabilities {
228 capabilities: HashMap<ExtensionCapability, (String, String)>,
229}
230
231impl AvailableCapabilities {
232 pub fn new() -> Self {
233 Self {
234 capabilities: HashMap::new(),
235 }
236 }
237
238 pub fn register_capability(
239 &mut self,
240 capability: ExtensionCapability,
241 package_name: String,
242 api_version: String,
243 ) {
244 self.capabilities
245 .insert(capability, (package_name, api_version));
246 }
247
248 pub fn has_capability(&self, capability: &ExtensionCapability) -> bool {
249 self.capabilities.contains_key(capability)
250 }
251
252 pub fn get_provider(&self, capability: &ExtensionCapability) -> Option<(String, String)> {
253 self.capabilities.get(capability).cloned()
254 }
255
256 pub fn list(&self) -> Vec<(ExtensionCapability, String, String)> {
257 self.capabilities
258 .iter()
259 .map(|(cap, (pkg, ver))| (cap.clone(), pkg.clone(), ver.clone()))
260 .collect()
261 }
262}
263
264#[derive(Clone)]
266pub struct ExtensionContext {
267 config: ExtensionContextConfig,
268 available_capabilities: Arc<RwLock<AvailableCapabilities>>,
269 providers: Arc<RwLock<HashMap<String, Arc<dyn ExtensionCapabilityProvider>>>>,
270}
271
272impl ExtensionContext {
273 pub fn new(
274 config: ExtensionContextConfig,
275 providers: Arc<RwLock<HashMap<String, Arc<dyn ExtensionCapabilityProvider>>>>,
276 ) -> Self {
277 Self {
278 config,
279 available_capabilities: Arc::new(RwLock::new(AvailableCapabilities::new())),
280 providers,
281 }
282 }
283
284 pub fn with_defaults(
285 extension_id: String,
286 api_base_url: String,
287 providers: Arc<RwLock<HashMap<String, Arc<dyn ExtensionCapabilityProvider>>>>,
288 ) -> Self {
289 Self::new(
290 ExtensionContextConfig {
291 extension_id,
292 api_base_url,
293 ..Default::default()
294 },
295 providers,
296 )
297 }
298
299 pub fn extension_id(&self) -> &str {
300 &self.config.extension_id
301 }
302
303 pub async fn register_provider(
304 &self,
305 package_name: String,
306 provider: Arc<dyn ExtensionCapabilityProvider>,
307 ) {
308 let manifest = provider.capability_manifest();
309 let mut available = self.available_capabilities.write().await;
310 for capability in &manifest.capabilities {
311 available.register_capability(
312 capability.clone(),
313 package_name.clone(),
314 manifest.api_version.clone(),
315 );
316 }
317 let mut providers = self.providers.write().await;
318 providers.insert(package_name, provider);
319 }
320
321 pub async fn invoke_capability(
322 &self,
323 capability: ExtensionCapability,
324 params: &serde_json::Value,
325 ) -> std::result::Result<serde_json::Value, CapabilityError> {
326 let available = self.available_capabilities.read().await;
327 let (package_name, _) = available
328 .get_provider(&capability)
329 .ok_or_else(|| CapabilityError::ProviderNotFound(capability.clone()))?;
330
331 let providers = self.providers.read().await;
332 let provider = providers.get(&package_name).ok_or_else(|| {
333 CapabilityError::ProviderError(format!("Provider '{}' not found", package_name))
334 })?;
335
336 let params = match capability {
338 ExtensionCapability::DeviceRegister => {
339 let mut p = params.as_object().cloned().unwrap_or_default();
340 p.insert("_extension_id".to_string(), serde_json::json!(self.config.extension_id));
341 serde_json::Value::Object(p)
342 }
343 _ => params.clone(),
344 };
345 provider.invoke_capability(capability, ¶ms).await
346 }
347
348 pub async fn has_capability(&self, capability: &ExtensionCapability) -> bool {
349 let available = self.available_capabilities.read().await;
350 available.has_capability(capability)
351 }
352
353 pub async fn list_capabilities(&self) -> Vec<(ExtensionCapability, String, String)> {
354 let available = self.available_capabilities.read().await;
355 available.list()
356 }
357
358 pub fn config(&self) -> &ExtensionContextConfig {
359 &self.config
360 }
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
369pub enum StreamDirection {
370 #[serde(rename = "upload")]
371 #[default]
372 Upload,
373 #[serde(rename = "download")]
374 Download,
375 #[serde(rename = "bidirectional")]
376 Bidirectional,
377}
378
379#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
381pub enum StreamMode {
382 #[serde(rename = "stateless")]
383 #[default]
384 Stateless,
385 #[serde(rename = "stateful")]
386 Stateful,
387 #[serde(rename = "push")]
388 Push,
389}
390
391#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
393pub enum StreamDataType {
394 #[serde(rename = "binary")]
395 Binary,
396 #[serde(rename = "text")]
397 Text,
398 #[serde(rename = "json")]
399 Json,
400 #[serde(rename = "image")]
401 Image { format: String },
402 #[serde(rename = "audio")]
403 Audio {
404 format: String,
405 sample_rate: u32,
406 channels: u16,
407 },
408 #[serde(rename = "video")]
409 Video {
410 codec: String,
411 width: u32,
412 height: u32,
413 fps: u32,
414 },
415 #[serde(rename = "sensor")]
416 Sensor { sensor_type: String },
417 #[serde(rename = "custom")]
418 Custom { mime_type: String },
419}
420
421impl StreamDataType {
422 pub fn mime_type(&self) -> String {
423 match self {
424 StreamDataType::Binary => "application/octet-stream".to_string(),
425 StreamDataType::Text => "text/plain".to_string(),
426 StreamDataType::Json => "application/json".to_string(),
427 StreamDataType::Image { format } => match format.to_lowercase().as_str() {
428 "jpeg" | "jpg" => "image/jpeg".to_string(),
429 "png" => "image/png".to_string(),
430 "gif" => "image/gif".to_string(),
431 "webp" => "image/webp".to_string(),
432 "bmp" => "image/bmp".to_string(),
433 _ => format!("image/{}", format),
434 },
435 StreamDataType::Audio { format, .. } => match format.to_lowercase().as_str() {
436 "pcm" => "audio/pcm".to_string(),
437 "mp3" => "audio/mpeg".to_string(),
438 "aac" => "audio/aac".to_string(),
439 "wav" => "audio/wav".to_string(),
440 "ogg" => "audio/ogg".to_string(),
441 _ => format!("audio/{}", format),
442 },
443 StreamDataType::Video { codec, .. } => match codec.to_lowercase().as_str() {
444 "h264" | "h.264" => "video/h264".to_string(),
445 "h265" | "h.265" | "hevc" => "video/h265".to_string(),
446 "vp8" => "video/vp8".to_string(),
447 "vp9" => "video/vp9".to_string(),
448 "av1" => "video/av1".to_string(),
449 _ => format!("video/{}", codec),
450 },
451 StreamDataType::Sensor { sensor_type } => {
452 format!("application/x-sensor.{}", sensor_type)
453 }
454 StreamDataType::Custom { mime_type } => mime_type.clone(),
455 }
456 }
457
458 pub fn from_mime_type(mime: &str) -> Option<Self> {
459 match mime {
460 "application/octet-stream" => Some(StreamDataType::Binary),
461 "text/plain" => Some(StreamDataType::Text),
462 "application/json" => Some(StreamDataType::Json),
463 m if m.starts_with("image/") => Some(StreamDataType::Image {
464 format: m.strip_prefix("image/")?.to_string(),
465 }),
466 m if m.starts_with("audio/") => Some(StreamDataType::Audio {
467 format: m.strip_prefix("audio/")?.to_string(),
468 sample_rate: 48000,
469 channels: 2,
470 }),
471 m if m.starts_with("video/") => Some(StreamDataType::Video {
472 codec: m.strip_prefix("video/")?.to_string(),
473 width: 1920,
474 height: 1080,
475 fps: 30,
476 }),
477 m if m.starts_with("application/x-sensor.") => Some(StreamDataType::Sensor {
478 sensor_type: m.strip_prefix("application/x-sensor.")?.to_string(),
479 }),
480 _ => Some(StreamDataType::Custom {
481 mime_type: mime.to_string(),
482 }),
483 }
484 }
485}
486
487#[derive(Debug, Clone, Serialize, Deserialize)]
489pub struct DataChunk {
490 pub sequence: u64,
491 pub data_type: StreamDataType,
492 pub data: Vec<u8>,
493 pub timestamp: i64,
494 #[serde(skip_serializing_if = "Option::is_none")]
495 pub metadata: Option<serde_json::Value>,
496 pub is_last: bool,
497}
498
499impl DataChunk {
500 pub fn binary(sequence: u64, data: Vec<u8>) -> Self {
501 Self {
502 sequence,
503 data_type: StreamDataType::Binary,
504 data,
505 timestamp: chrono::Utc::now().timestamp_millis(),
506 metadata: None,
507 is_last: false,
508 }
509 }
510
511 pub fn text(sequence: u64, text: String) -> Self {
512 Self {
513 sequence,
514 data_type: StreamDataType::Text,
515 data: text.into_bytes(),
516 timestamp: chrono::Utc::now().timestamp_millis(),
517 metadata: None,
518 is_last: false,
519 }
520 }
521
522 pub fn json(
523 sequence: u64,
524 value: serde_json::Value,
525 ) -> std::result::Result<Self, serde_json::Error> {
526 Ok(Self {
527 sequence,
528 data_type: StreamDataType::Json,
529 data: serde_json::to_vec(&value)?,
530 timestamp: chrono::Utc::now().timestamp_millis(),
531 metadata: None,
532 is_last: false,
533 })
534 }
535
536 pub fn image(sequence: u64, data: Vec<u8>, format: String) -> Self {
537 Self {
538 sequence,
539 data_type: StreamDataType::Image { format },
540 data,
541 timestamp: chrono::Utc::now().timestamp_millis(),
542 metadata: None,
543 is_last: false,
544 }
545 }
546
547 pub fn with_last(mut self) -> Self {
548 self.is_last = true;
549 self
550 }
551
552 pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
553 self.metadata = Some(metadata);
554 self
555 }
556}
557
558#[derive(Debug, Clone, Serialize, Deserialize)]
560pub struct StreamError {
561 pub code: String,
562 pub message: String,
563 pub retryable: bool,
564}
565
566impl StreamError {
567 pub fn new(code: impl Into<String>, message: impl Into<String>, retryable: bool) -> Self {
569 Self {
570 code: code.into(),
571 message: message.into(),
572 retryable,
573 }
574 }
575
576 pub fn fatal(code: impl Into<String>, message: impl Into<String>) -> Self {
578 Self {
579 code: code.into(),
580 message: message.into(),
581 retryable: false,
582 }
583 }
584
585 pub fn retryable(code: impl Into<String>, message: impl Into<String>) -> Self {
587 Self {
588 code: code.into(),
589 message: message.into(),
590 retryable: true,
591 }
592 }
593}
594
595impl std::fmt::Display for StreamError {
596 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597 write!(f, "[{}] {}", self.code, self.message)
598 }
599}
600
601#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct StreamResult {
604 #[serde(skip_serializing_if = "Option::is_none")]
605 pub input_sequence: Option<u64>,
606 pub output_sequence: u64,
607 pub data: Vec<u8>,
608 pub data_type: StreamDataType,
609 pub processing_ms: f32,
610 #[serde(skip_serializing_if = "Option::is_none")]
611 pub metadata: Option<serde_json::Value>,
612 #[serde(skip_serializing_if = "Option::is_none")]
613 pub error: Option<StreamError>,
614}
615
616impl StreamResult {
617 pub fn success(
618 input_sequence: Option<u64>,
619 output_sequence: u64,
620 data: Vec<u8>,
621 data_type: StreamDataType,
622 processing_ms: f32,
623 ) -> Self {
624 Self {
625 input_sequence,
626 output_sequence,
627 data,
628 data_type,
629 processing_ms,
630 metadata: None,
631 error: None,
632 }
633 }
634
635 pub fn json(
636 input_sequence: Option<u64>,
637 output_sequence: u64,
638 value: serde_json::Value,
639 processing_ms: f32,
640 ) -> std::result::Result<Self, serde_json::Error> {
641 Ok(Self::success(
642 input_sequence,
643 output_sequence,
644 serde_json::to_vec(&value)?,
645 StreamDataType::Json,
646 processing_ms,
647 ))
648 }
649
650 pub fn error(input_sequence: Option<u64>, error: StreamError) -> Self {
652 Self {
653 input_sequence,
654 output_sequence: 0,
655 data: Vec::new(),
656 data_type: StreamDataType::Binary,
657 processing_ms: 0.0,
658 metadata: None,
659 error: Some(error),
660 }
661 }
662
663 pub fn error_with_details(
665 input_sequence: Option<u64>,
666 output_sequence: u64,
667 error: StreamError,
668 processing_ms: f32,
669 ) -> Self {
670 Self {
671 input_sequence,
672 output_sequence,
673 data: Vec::new(),
674 data_type: StreamDataType::Binary,
675 processing_ms,
676 metadata: None,
677 error: Some(error),
678 }
679 }
680
681 pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
682 self.metadata = Some(metadata);
683 self
684 }
685
686 pub fn as_json(&self) -> std::result::Result<serde_json::Value, serde_json::Error> {
688 serde_json::from_slice(&self.data)
689 }
690
691 pub fn as_text(&self) -> std::result::Result<&str, std::str::Utf8Error> {
693 std::str::from_utf8(&self.data)
694 }
695}
696
697#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
699pub struct FlowControl {
700 pub supports_backpressure: bool,
701 pub window_size: u32,
702 pub supports_throttling: bool,
703 pub max_rate: u32,
704}
705
706impl FlowControl {
707 pub fn default_stream() -> Self {
708 Self {
709 supports_backpressure: true,
710 window_size: 64 * 1024,
711 supports_throttling: false,
712 max_rate: 0,
713 }
714 }
715}
716
717impl Default for FlowControl {
718 fn default() -> Self {
719 Self::default_stream()
720 }
721}
722
723#[derive(Debug, Clone, Default, Serialize, Deserialize)]
725pub struct StreamCapability {
726 pub supported_data_types: Vec<StreamDataType>,
727 pub max_chunk_size: usize,
728 pub preferred_chunk_size: usize,
729 pub max_concurrent_sessions: usize,
730 pub mode: StreamMode,
731 pub direction: StreamDirection,
732 pub flow_control: FlowControl,
733 #[serde(default)]
735 pub config_schema: Option<serde_json::Value>,
736}
737
738impl StreamCapability {
739 pub fn push() -> Self {
741 Self {
742 supported_data_types: vec![StreamDataType::Binary],
743 max_chunk_size: 64 * 1024,
744 preferred_chunk_size: 16 * 1024,
745 max_concurrent_sessions: 5,
746 mode: StreamMode::Push,
747 direction: StreamDirection::Download,
748 flow_control: FlowControl::default(),
749 config_schema: None,
750 }
751 }
752
753 pub fn upload() -> Self {
755 Self {
756 supported_data_types: vec![StreamDataType::Binary],
757 max_chunk_size: 1024 * 1024,
758 preferred_chunk_size: 64 * 1024,
759 max_concurrent_sessions: 5,
760 mode: StreamMode::Stateless,
761 direction: StreamDirection::Upload,
762 flow_control: FlowControl::default(),
763 config_schema: None,
764 }
765 }
766
767 pub fn download() -> Self {
769 Self {
770 supported_data_types: vec![StreamDataType::Binary],
771 max_chunk_size: 1024 * 1024,
772 preferred_chunk_size: 64 * 1024,
773 max_concurrent_sessions: 5,
774 mode: StreamMode::Stateless,
775 direction: StreamDirection::Download,
776 flow_control: FlowControl::default(),
777 config_schema: None,
778 }
779 }
780
781 pub fn stateful() -> Self {
783 Self {
784 supported_data_types: vec![StreamDataType::Binary],
785 max_chunk_size: 1024 * 1024,
786 preferred_chunk_size: 64 * 1024,
787 max_concurrent_sessions: 5,
788 mode: StreamMode::Stateful,
789 direction: StreamDirection::Bidirectional,
790 flow_control: FlowControl::default(),
791 config_schema: None,
792 }
793 }
794
795 pub fn with_data_type(mut self, data_type: StreamDataType) -> Self {
797 self.supported_data_types.push(data_type);
798 self
799 }
800
801 pub fn with_chunk_size(mut self, preferred: usize, max: usize) -> Self {
803 self.preferred_chunk_size = preferred;
804 self.max_chunk_size = max;
805 self
806 }
807}
808
809#[derive(Debug, Clone, Serialize, Deserialize)]
811pub struct ClientInfo {
812 pub client_id: String,
813 pub ip_addr: Option<String>,
814 pub user_agent: Option<String>,
815}
816
817#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct StreamSession {
820 pub id: String,
821 pub extension_id: String,
822 pub config: serde_json::Value,
823 pub started_at: i64,
824 pub last_activity: i64,
825 pub bytes_in: u64,
826 pub bytes_out: u64,
827 pub chunks_in: u64,
828 pub chunks_out: u64,
829 pub client_info: Option<ClientInfo>,
830 pub metadata: Option<serde_json::Value>,
831}
832
833impl StreamSession {
834 pub fn new(
835 id: String,
836 extension_id: String,
837 config: serde_json::Value,
838 client_info: ClientInfo,
839 ) -> Self {
840 let now = chrono::Utc::now().timestamp_millis();
841 Self {
842 id,
843 extension_id,
844 config,
845 started_at: now,
846 last_activity: now,
847 bytes_in: 0,
848 bytes_out: 0,
849 chunks_in: 0,
850 chunks_out: 0,
851 client_info: Some(client_info),
852 metadata: None,
853 }
854 }
855
856 pub fn age_secs(&self) -> i64 {
858 let now = chrono::Utc::now().timestamp();
859 (now - self.started_at).max(0)
860 }
861
862 pub fn age_ms(&self) -> i64 {
864 let now = chrono::Utc::now().timestamp_millis();
865 let started_ms = self.started_at * 1000;
866 (now - started_ms).max(0)
867 }
868}
869
870#[derive(Debug, Clone, Serialize, Deserialize)]
872pub struct SessionStats {
873 pub input_chunks: u64,
874 pub output_chunks: u64,
875 pub input_bytes: u64,
876 pub output_bytes: u64,
877 pub errors: u64,
878 pub last_activity: i64,
879}
880
881impl Default for SessionStats {
882 fn default() -> Self {
883 Self {
884 input_chunks: 0,
885 output_chunks: 0,
886 input_bytes: 0,
887 output_bytes: 0,
888 errors: 0,
889 last_activity: chrono::Utc::now().timestamp(),
890 }
891 }
892}
893
894impl SessionStats {
895 pub fn record_error(&mut self) {
897 self.errors += 1;
898 self.last_activity = chrono::Utc::now().timestamp();
899 }
900
901 pub fn record_input(&mut self, bytes: u64) {
903 self.input_chunks += 1;
904 self.input_bytes += bytes;
905 self.last_activity = chrono::Utc::now().timestamp();
906 }
907
908 pub fn record_output(&mut self, bytes: u64) {
910 self.output_chunks += 1;
911 self.output_bytes += bytes;
912 self.last_activity = chrono::Utc::now().timestamp();
913 }
914}
915
916#[derive(Debug, Clone, Serialize, Deserialize, Default)]
922pub struct EventFilter {
923 pub source: Option<String>,
924 pub device_id: Option<String>,
925 pub extension_id: Option<String>,
926 pub agent_id: Option<String>,
927 pub rule_id: Option<String>,
928 pub workflow_id: Option<String>,
929 pub expression: Option<String>,
930}
931
932impl EventFilter {
933 pub fn new() -> Self {
934 Self::default()
935 }
936
937 pub fn by_source(mut self, source: impl Into<String>) -> Self {
938 self.source = Some(source.into());
939 self
940 }
941
942 pub fn by_device_id(mut self, device_id: impl Into<String>) -> Self {
943 self.device_id = Some(device_id.into());
944 self
945 }
946
947 pub fn by_extension_id(mut self, extension_id: impl Into<String>) -> Self {
948 self.extension_id = Some(extension_id.into());
949 self
950 }
951
952 pub fn matches(&self, _event_type: &str, event_value: &serde_json::Value) -> bool {
953 if let Some(ref source) = self.source {
954 if event_value.get("source").and_then(|v| v.as_str()) != Some(source.as_str()) {
955 return false;
956 }
957 }
958 if let Some(ref device_id) = self.device_id {
959 if event_value.get("device_id").and_then(|v| v.as_str()) != Some(device_id.as_str()) {
960 return false;
961 }
962 }
963 true
964 }
965}
966
967#[derive(Debug, Clone, Serialize, Deserialize)]
969pub struct EventSubscription {
970 pub event_types: Vec<String>,
971 pub filters: Option<EventFilter>,
972 pub max_buffer_size: usize,
973 pub enabled: bool,
974}
975
976impl Default for EventSubscription {
977 fn default() -> Self {
978 Self {
979 event_types: Vec::new(),
980 filters: None,
981 max_buffer_size: 1000,
982 enabled: true,
983 }
984 }
985}
986
987impl EventSubscription {
988 pub fn new() -> Self {
989 Self::default()
990 }
991
992 pub fn with_types(event_types: Vec<String>) -> Self {
993 Self {
994 event_types,
995 filters: None,
996 max_buffer_size: 1000,
997 enabled: true,
998 }
999 }
1000
1001 pub fn with_filters(mut self, filters: EventFilter) -> Self {
1002 self.filters = Some(filters);
1003 self
1004 }
1005
1006 pub fn is_subscribed(&self, event_type: &str) -> bool {
1007 if !self.enabled {
1008 return false;
1009 }
1010 if self.event_types.is_empty() {
1011 return true;
1012 }
1013 self.event_types.iter().any(|et| et == event_type)
1014 }
1015}
1016
1017pub type NativeCapabilityInvokeFn = unsafe extern "C" fn(*const u8, usize) -> *mut c_char;
1023pub type NativeCapabilityFreeFn = unsafe extern "C" fn(*mut c_char);
1024
1025#[derive(Clone, Copy)]
1026struct NativeCapabilityBridge {
1027 invoke: NativeCapabilityInvokeFn,
1028 free: NativeCapabilityFreeFn,
1029}
1030
1031static NATIVE_CAPABILITY_BRIDGE: OnceLock<NativeCapabilityBridge> = OnceLock::new();
1032
1033pub fn set_native_capability_bridge(
1035 invoke: NativeCapabilityInvokeFn,
1036 free: NativeCapabilityFreeFn,
1037) {
1038 let _ = NATIVE_CAPABILITY_BRIDGE.set(NativeCapabilityBridge { invoke, free });
1039}
1040
1041pub type PushOutputWriterFn = unsafe extern "C" fn(*const u8, usize) -> i32;
1049
1050static PUSH_WRITER: OnceLock<PushOutputWriterFn> = OnceLock::new();
1051
1052pub fn set_push_output_writer(writer: PushOutputWriterFn) {
1055 let _ = PUSH_WRITER.set(writer);
1056}
1057
1058pub fn send_push_output(msg: &PushOutputMessage) -> crate::ipc_types::Result<()> {
1063 let writer = PUSH_WRITER.get().ok_or_else(|| {
1064 crate::ipc_types::ExtensionError::InternalError("push output writer not registered".into())
1065 })?;
1066 let json = serde_json::to_vec(msg).map_err(|e| {
1067 crate::ipc_types::ExtensionError::InternalError(format!(
1068 "failed to serialize PushOutputMessage: {}",
1069 e
1070 ))
1071 })?;
1072 let rc = unsafe { writer(json.as_ptr(), json.len()) };
1073 if rc == 0 {
1074 Ok(())
1075 } else {
1076 Err(crate::ipc_types::ExtensionError::InternalError(format!(
1077 "push_output_writer returned {}",
1078 rc
1079 )))
1080 }
1081}
1082
1083#[cfg(not(target_arch = "wasm32"))]
1086fn block_on_sync<F, T>(future: F) -> std::result::Result<T, CapabilityError>
1087where
1088 F: std::future::Future<Output = std::result::Result<T, CapabilityError>>,
1089{
1090 match tokio::runtime::Handle::try_current() {
1091 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(future)),
1092 Err(_) => {
1093 let runtime = tokio::runtime::Runtime::new().map_err(|e| {
1094 CapabilityError::ProviderError(format!("failed to create tokio runtime: {}", e))
1095 })?;
1096 runtime.block_on(future)
1097 }
1098 }
1099}
1100
1101#[cfg(not(target_arch = "wasm32"))]
1104#[derive(Clone)]
1105pub struct CapabilityContext {
1106 ctx: Arc<RwLock<Option<ExtensionContext>>>,
1107}
1108
1109#[cfg(not(target_arch = "wasm32"))]
1110impl Default for CapabilityContext {
1111 fn default() -> Self {
1112 Self {
1113 ctx: Arc::new(RwLock::new(None)),
1114 }
1115 }
1116}
1117
1118#[cfg(not(target_arch = "wasm32"))]
1119impl CapabilityContext {
1120 pub fn from_context(context: ExtensionContext) -> Self {
1121 Self {
1122 ctx: Arc::new(RwLock::new(Some(context))),
1123 }
1124 }
1125
1126 pub fn invoke_capability(
1127 &self,
1128 capability_name: &str,
1129 params: &serde_json::Value,
1130 ) -> serde_json::Value {
1131 let capability = match ExtensionCapability::from_name(capability_name) {
1132 Some(capability) => capability,
1133 None => {
1134 return serde_json::json!({
1135 "success": false,
1136 "error": format!("Unknown capability: {}", capability_name),
1137 });
1138 }
1139 };
1140
1141 let context = match block_on_sync(async {
1142 Ok::<Option<ExtensionContext>, CapabilityError>(self.ctx.read().await.clone())
1143 }) {
1144 Ok(context) => context,
1145 Err(error) => {
1146 return serde_json::json!({
1147 "success": false,
1148 "error": error.to_string(),
1149 });
1150 }
1151 };
1152
1153 if let Some(context) = context {
1154 return match block_on_sync(async {
1155 context.invoke_capability(capability, params).await
1156 }) {
1157 Ok(value) => value,
1158 Err(error) => serde_json::json!({
1159 "success": false,
1160 "error": error.to_string(),
1161 }),
1162 };
1163 }
1164
1165 let Some(bridge) = NATIVE_CAPABILITY_BRIDGE.get().copied() else {
1166 return serde_json::json!({
1167 "success": false,
1168 "error": "native capability bridge is not initialized",
1169 });
1170 };
1171
1172 let input = match serde_json::to_vec(&serde_json::json!({
1173 "capability": capability_name,
1174 "params": params,
1175 })) {
1176 Ok(input) => input,
1177 Err(error) => {
1178 return serde_json::json!({
1179 "success": false,
1180 "error": format!("failed to serialize capability request: {}", error),
1181 });
1182 }
1183 };
1184
1185 let ptr = unsafe { (bridge.invoke)(input.as_ptr(), input.len()) };
1186 if ptr.is_null() {
1187 return serde_json::json!({
1188 "success": false,
1189 "error": "native capability bridge returned null",
1190 });
1191 }
1192
1193 let response = unsafe { CStr::from_ptr(ptr) }.to_string_lossy().to_string();
1194 unsafe { (bridge.free)(ptr) };
1195
1196 serde_json::from_str(&response).unwrap_or_else(|error| {
1197 serde_json::json!({
1198 "success": false,
1199 "error": format!("failed to parse capability bridge response: {}", error),
1200 })
1201 })
1202 }
1203}
1204
1205#[async_trait]
1211pub trait Extension: Send + Sync {
1212 fn metadata(&self) -> &ExtensionMetadata;
1214
1215 fn descriptor(&self) -> Option<ExtensionDescriptor> {
1217 None
1218 }
1219
1220 fn init(&mut self) -> Result<()> {
1222 Ok(())
1223 }
1224
1225 fn start(&mut self) -> Result<()> {
1227 Ok(())
1228 }
1229
1230 fn stop(&mut self) -> Result<()> {
1232 Ok(())
1233 }
1234
1235 fn status(&self) -> String {
1237 "unknown".to_string()
1238 }
1239
1240 fn metrics(&self) -> Vec<MetricDescriptor> {
1242 Vec::new()
1243 }
1244
1245 fn commands(&self) -> Vec<CommandDescriptor> {
1247 Vec::new()
1248 }
1249
1250 fn produce_metrics(&self) -> Result<Vec<ExtensionMetricValue>> {
1252 Ok(Vec::new())
1253 }
1254
1255 async fn health_check(&self) -> Result<bool> {
1257 Ok(true)
1258 }
1259
1260 async fn configure(&mut self, _config: &serde_json::Value) -> Result<()> {
1262 Ok(())
1263 }
1264
1265 fn get_stats(&self) -> ExtensionStats {
1267 ExtensionStats::default()
1268 }
1269
1270 fn latest_output(&self) -> Option<PushOutputMessage> {
1272 None
1273 }
1274
1275 fn stream_capability(&self) -> Option<StreamCapability> {
1277 None
1278 }
1279
1280 async fn execute_command(
1282 &self,
1283 command_name: &str,
1284 args: &serde_json::Value,
1285 ) -> Result<serde_json::Value> {
1286 let _ = args;
1287 Err(ExtensionError::CommandNotFound(command_name.to_string()))
1288 }
1289
1290 async fn init_session(&self, _session: &StreamSession) -> Result<()> {
1292 Ok(())
1293 }
1294
1295 async fn process_session_chunk(
1297 &self,
1298 _session_id: &str,
1299 _chunk: DataChunk,
1300 ) -> Result<StreamResult> {
1301 Err(ExtensionError::ExecutionFailed(
1302 "Session streaming not supported".to_string(),
1303 ))
1304 }
1305
1306 async fn close_session(&self, _session_id: &str) -> Result<SessionStats> {
1308 Ok(SessionStats::default())
1309 }
1310
1311 async fn process_chunk(&self, _chunk: DataChunk) -> Result<StreamResult> {
1313 Err(ExtensionError::ExecutionFailed(
1314 "Streaming not supported".to_string(),
1315 ))
1316 }
1317
1318 async fn start_push(&self, _session_id: &str) -> Result<()> {
1320 Ok(())
1321 }
1322
1323 async fn stop_push(&self, _session_id: &str) -> Result<()> {
1325 Ok(())
1326 }
1327
1328 #[cfg(not(target_arch = "wasm32"))]
1331 fn set_output_sender(&self, _sender: Arc<tokio::sync::mpsc::Sender<PushOutputMessage>>) {}
1332
1333 fn event_subscriptions(&self) -> &[&str] {
1335 &[]
1336 }
1337
1338 fn handle_event(&self, _event_type: &str, _payload: &serde_json::Value) -> Result<()> {
1340 Ok(())
1341 }
1342
1343 async fn on_unload(&self) -> Result<()> {
1345 Ok(())
1346 }
1347
1348 fn as_any(&self) -> &dyn std::any::Any;
1350}
1351
1352