Skip to main content

neomind_extension_sdk/
host.rs

1//! Host API for NeoMind Extensions
2//!
3//! This module contains the Extension trait and capability system that extensions
4//! implement to integrate with the NeoMind platform.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::ffi::CStr;
10use std::os::raw::c_char;
11use std::sync::Arc;
12use std::sync::OnceLock;
13#[cfg(not(target_arch = "wasm32"))]
14use tokio::sync::RwLock;
15
16use crate::ipc_types::{
17    CommandDescriptor, ExtensionDescriptor, ExtensionError, ExtensionMetadata,
18    ExtensionMetricValue, ExtensionStats, MetricDescriptor, PushOutputMessage, Result,
19};
20
21// ============================================================================
22// Capability System
23// ============================================================================
24
25macro_rules! define_capabilities {
26    ($($variant:ident => $const_name:ident => $name:literal => $doc:literal),* $(,)?) => {
27        /// Extension capabilities for accessing NeoMind platform features.
28        #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
29        pub enum ExtensionCapability {
30            $(
31                #[doc = $doc]
32                #[serde(rename = $name)]
33                $variant,
34            )*
35            #[serde(rename = "custom")]
36            Custom(String),
37        }
38
39        impl ExtensionCapability {
40            pub fn is_custom(&self) -> bool {
41                matches!(self, ExtensionCapability::Custom(_))
42            }
43
44            pub fn name(&self) -> String {
45                match self {
46                    $(ExtensionCapability::$variant => $name.to_string(),)*
47                    ExtensionCapability::Custom(name) => name.clone(),
48                }
49            }
50
51            pub fn all_capabilities() -> Vec<Self> {
52                vec![$(ExtensionCapability::$variant,)*]
53            }
54
55            pub fn from_name(name: &str) -> Option<Self> {
56                match name {
57                    $($name => Some(ExtensionCapability::$variant),)*
58                    _ => Some(ExtensionCapability::Custom(name.to_string())),
59                }
60            }
61        }
62
63        /// Capability name constants.
64        pub mod capabilities {
65            $(pub const $const_name: &str = $name;)*
66        }
67    };
68}
69
70define_capabilities! {
71    DeviceMetricsRead => DEVICE_METRICS_READ => "device_metrics_read" => "Access to device metrics (read current state)",
72    DeviceMetricsWrite => DEVICE_METRICS_WRITE => "device_metrics_write" => "Access to write device metrics (including virtual metrics)",
73    DeviceControl => DEVICE_CONTROL => "device_control" => "Access to control devices (send commands)",
74    StorageQuery => STORAGE_QUERY => "storage_query" => "Access to storage queries (read telemetry)",
75    EventPublish => EVENT_PUBLISH => "event_publish" => "Access to publish events",
76    EventSubscribe => EVENT_SUBSCRIBE => "event_subscribe" => "Access to subscribe to events",
77    TelemetryHistory => TELEMETRY_HISTORY => "telemetry_history" => "Access to query device telemetry history",
78    MetricsAggregate => METRICS_AGGREGATE => "metrics_aggregate" => "Access to aggregate device metrics",
79    ExtensionCall => EXTENSION_CALL => "extension_call" => "Access to call other extensions",
80    AgentTrigger => AGENT_TRIGGER => "agent_trigger" => "Access to trigger agents",
81    RuleTrigger => RULE_TRIGGER => "rule_trigger" => "Access to trigger rules",
82    DeviceTemplateRegister => DEVICE_TEMPLATE_REGISTER => "device_template_register" => "Register device type templates",
83    DeviceRegister => DEVICE_REGISTER => "device_register" => "Register device instances",
84    DeviceUnregister => DEVICE_UNREGISTER => "device_unregister" => "Unregister device instances",
85}
86
87impl ExtensionCapability {
88    pub fn display_name(&self) -> String {
89        match self {
90            ExtensionCapability::DeviceMetricsRead => "Device Metrics Read".to_string(),
91            ExtensionCapability::DeviceMetricsWrite => "Device Metrics Write".to_string(),
92            ExtensionCapability::DeviceControl => "Device Control".to_string(),
93            ExtensionCapability::StorageQuery => "Storage Query".to_string(),
94            ExtensionCapability::EventPublish => "Event Publish".to_string(),
95            ExtensionCapability::EventSubscribe => "Event Subscribe".to_string(),
96            ExtensionCapability::TelemetryHistory => "Telemetry History".to_string(),
97            ExtensionCapability::MetricsAggregate => "Metrics Aggregate".to_string(),
98            ExtensionCapability::ExtensionCall => "Extension Call".to_string(),
99            ExtensionCapability::AgentTrigger => "Agent Trigger".to_string(),
100            ExtensionCapability::RuleTrigger => "Rule Trigger".to_string(),
101            ExtensionCapability::DeviceTemplateRegister => "Device Template Register".to_string(),
102            ExtensionCapability::DeviceRegister => "Device Register".to_string(),
103            ExtensionCapability::DeviceUnregister => "Device Unregister".to_string(),
104            ExtensionCapability::Custom(name) => format!("Custom: {}", name),
105        }
106    }
107
108    pub fn description(&self) -> String {
109        match self {
110            ExtensionCapability::DeviceMetricsRead => {
111                "Read current device metrics and state".to_string()
112            }
113            ExtensionCapability::DeviceMetricsWrite => {
114                "Write device metrics including virtual metrics".to_string()
115            }
116            ExtensionCapability::DeviceControl => "Send commands to control devices".to_string(),
117            ExtensionCapability::StorageQuery => "Query stored telemetry data".to_string(),
118            ExtensionCapability::EventPublish => "Publish events".to_string(),
119            ExtensionCapability::EventSubscribe => "Subscribe to events".to_string(),
120            ExtensionCapability::TelemetryHistory => {
121                "Query device telemetry history data".to_string()
122            }
123            ExtensionCapability::MetricsAggregate => {
124                "Aggregate and calculate device metrics".to_string()
125            }
126            ExtensionCapability::ExtensionCall => "Call other extensions".to_string(),
127            ExtensionCapability::AgentTrigger => "Trigger AI agent execution".to_string(),
128            ExtensionCapability::RuleTrigger => "Trigger rule engine execution".to_string(),
129            ExtensionCapability::DeviceTemplateRegister => {
130                "Register device type templates".to_string()
131            }
132            ExtensionCapability::DeviceRegister => "Register device instances".to_string(),
133            ExtensionCapability::DeviceUnregister => "Unregister device instances".to_string(),
134            ExtensionCapability::Custom(_) => "Custom capability".to_string(),
135        }
136    }
137
138    pub fn category(&self) -> String {
139        match self {
140            ExtensionCapability::DeviceMetricsRead
141            | ExtensionCapability::DeviceMetricsWrite
142            | ExtensionCapability::DeviceControl
143            | ExtensionCapability::DeviceTemplateRegister
144            | ExtensionCapability::DeviceRegister
145            | ExtensionCapability::DeviceUnregister => "device".to_string(),
146            ExtensionCapability::StorageQuery => "storage".to_string(),
147            ExtensionCapability::EventPublish | ExtensionCapability::EventSubscribe => {
148                "event".to_string()
149            }
150            ExtensionCapability::TelemetryHistory | ExtensionCapability::MetricsAggregate => {
151                "telemetry".to_string()
152            }
153            ExtensionCapability::ExtensionCall => "extension".to_string(),
154            ExtensionCapability::AgentTrigger => "agent".to_string(),
155            ExtensionCapability::RuleTrigger => "rule".to_string(),
156            ExtensionCapability::Custom(_) => "custom".to_string(),
157        }
158    }
159}
160
161/// Capability manifest for extension capabilities.
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct CapabilityManifest {
164    pub capabilities: Vec<ExtensionCapability>,
165    pub api_version: String,
166    pub min_core_version: String,
167    pub package_name: String,
168}
169
170// ============================================================================
171// Capability Provider
172// ============================================================================
173
174/// Error type for capability operations.
175#[derive(Debug, thiserror::Error)]
176pub enum CapabilityError {
177    #[error("Capability not available: {0:?}")]
178    NotAvailable(ExtensionCapability),
179    #[error("Provider error: {0}")]
180    ProviderError(String),
181    #[error("Invalid parameters: {0}")]
182    InvalidParameters(String),
183    #[error("Provider not found for capability: {0:?}")]
184    ProviderNotFound(ExtensionCapability),
185}
186
187/// Trait for capability providers.
188#[async_trait]
189pub trait ExtensionCapabilityProvider: Send + Sync {
190    fn capability_manifest(&self) -> CapabilityManifest;
191
192    async fn invoke_capability(
193        &self,
194        capability: ExtensionCapability,
195        params: &serde_json::Value,
196    ) -> std::result::Result<serde_json::Value, CapabilityError>;
197}
198
199// ============================================================================
200// Extension Context
201// ============================================================================
202
203/// Extension context configuration.
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct ExtensionContextConfig {
206    #[serde(default)]
207    pub api_base_url: String,
208    pub api_version: String,
209    pub extension_id: String,
210    #[serde(default)]
211    pub rate_limit: Option<usize>,
212}
213
214impl Default for ExtensionContextConfig {
215    fn default() -> Self {
216        Self {
217            api_base_url: String::new(),
218            api_version: "v1".to_string(),
219            extension_id: String::new(),
220            rate_limit: None,
221        }
222    }
223}
224
225/// Available capabilities registry.
226#[derive(Debug, Clone, Default)]
227pub struct AvailableCapabilities {
228    capabilities: HashMap<ExtensionCapability, (String, String)>,
229}
230
231impl AvailableCapabilities {
232    pub fn new() -> Self {
233        Self {
234            capabilities: HashMap::new(),
235        }
236    }
237
238    pub fn register_capability(
239        &mut self,
240        capability: ExtensionCapability,
241        package_name: String,
242        api_version: String,
243    ) {
244        self.capabilities
245            .insert(capability, (package_name, api_version));
246    }
247
248    pub fn has_capability(&self, capability: &ExtensionCapability) -> bool {
249        self.capabilities.contains_key(capability)
250    }
251
252    pub fn get_provider(&self, capability: &ExtensionCapability) -> Option<(String, String)> {
253        self.capabilities.get(capability).cloned()
254    }
255
256    pub fn list(&self) -> Vec<(ExtensionCapability, String, String)> {
257        self.capabilities
258            .iter()
259            .map(|(cap, (pkg, ver))| (cap.clone(), pkg.clone(), ver.clone()))
260            .collect()
261    }
262}
263
264/// Extension context for capability invocation.
265#[derive(Clone)]
266pub struct ExtensionContext {
267    config: ExtensionContextConfig,
268    available_capabilities: Arc<RwLock<AvailableCapabilities>>,
269    providers: Arc<RwLock<HashMap<String, Arc<dyn ExtensionCapabilityProvider>>>>,
270}
271
272impl ExtensionContext {
273    pub fn new(
274        config: ExtensionContextConfig,
275        providers: Arc<RwLock<HashMap<String, Arc<dyn ExtensionCapabilityProvider>>>>,
276    ) -> Self {
277        Self {
278            config,
279            available_capabilities: Arc::new(RwLock::new(AvailableCapabilities::new())),
280            providers,
281        }
282    }
283
284    pub fn with_defaults(
285        extension_id: String,
286        api_base_url: String,
287        providers: Arc<RwLock<HashMap<String, Arc<dyn ExtensionCapabilityProvider>>>>,
288    ) -> Self {
289        Self::new(
290            ExtensionContextConfig {
291                extension_id,
292                api_base_url,
293                ..Default::default()
294            },
295            providers,
296        )
297    }
298
299    pub fn extension_id(&self) -> &str {
300        &self.config.extension_id
301    }
302
303    pub async fn register_provider(
304        &self,
305        package_name: String,
306        provider: Arc<dyn ExtensionCapabilityProvider>,
307    ) {
308        let manifest = provider.capability_manifest();
309        let mut available = self.available_capabilities.write().await;
310        for capability in &manifest.capabilities {
311            available.register_capability(
312                capability.clone(),
313                package_name.clone(),
314                manifest.api_version.clone(),
315            );
316        }
317        let mut providers = self.providers.write().await;
318        providers.insert(package_name, provider);
319    }
320
321    pub async fn invoke_capability(
322        &self,
323        capability: ExtensionCapability,
324        params: &serde_json::Value,
325    ) -> std::result::Result<serde_json::Value, CapabilityError> {
326        let available = self.available_capabilities.read().await;
327        let (package_name, _) = available
328            .get_provider(&capability)
329            .ok_or_else(|| CapabilityError::ProviderNotFound(capability.clone()))?;
330
331        let providers = self.providers.read().await;
332        let provider = providers.get(&package_name).ok_or_else(|| {
333            CapabilityError::ProviderError(format!("Provider '{}' not found", package_name))
334        })?;
335
336        // Inject extension_id into params for device_register
337        let params = match capability {
338            ExtensionCapability::DeviceRegister => {
339                let mut p = params.as_object().cloned().unwrap_or_default();
340                p.insert("_extension_id".to_string(), serde_json::json!(self.config.extension_id));
341                serde_json::Value::Object(p)
342            }
343            _ => params.clone(),
344        };
345        provider.invoke_capability(capability, &params).await
346    }
347
348    pub async fn has_capability(&self, capability: &ExtensionCapability) -> bool {
349        let available = self.available_capabilities.read().await;
350        available.has_capability(capability)
351    }
352
353    pub async fn list_capabilities(&self) -> Vec<(ExtensionCapability, String, String)> {
354        let available = self.available_capabilities.read().await;
355        available.list()
356    }
357
358    pub fn config(&self) -> &ExtensionContextConfig {
359        &self.config
360    }
361}
362
363// ============================================================================
364// Streaming Types
365// ============================================================================
366
367/// Stream direction.
368#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
369pub enum StreamDirection {
370    #[serde(rename = "upload")]
371    #[default]
372    Upload,
373    #[serde(rename = "download")]
374    Download,
375    #[serde(rename = "bidirectional")]
376    Bidirectional,
377}
378
379/// Stream mode.
380#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
381pub enum StreamMode {
382    #[serde(rename = "stateless")]
383    #[default]
384    Stateless,
385    #[serde(rename = "stateful")]
386    Stateful,
387    #[serde(rename = "push")]
388    Push,
389}
390
391/// Stream data type.
392#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
393pub enum StreamDataType {
394    #[serde(rename = "binary")]
395    Binary,
396    #[serde(rename = "text")]
397    Text,
398    #[serde(rename = "json")]
399    Json,
400    #[serde(rename = "image")]
401    Image { format: String },
402    #[serde(rename = "audio")]
403    Audio {
404        format: String,
405        sample_rate: u32,
406        channels: u16,
407    },
408    #[serde(rename = "video")]
409    Video {
410        codec: String,
411        width: u32,
412        height: u32,
413        fps: u32,
414    },
415    #[serde(rename = "sensor")]
416    Sensor { sensor_type: String },
417    #[serde(rename = "custom")]
418    Custom { mime_type: String },
419}
420
421impl StreamDataType {
422    pub fn mime_type(&self) -> String {
423        match self {
424            StreamDataType::Binary => "application/octet-stream".to_string(),
425            StreamDataType::Text => "text/plain".to_string(),
426            StreamDataType::Json => "application/json".to_string(),
427            StreamDataType::Image { format } => match format.to_lowercase().as_str() {
428                "jpeg" | "jpg" => "image/jpeg".to_string(),
429                "png" => "image/png".to_string(),
430                "gif" => "image/gif".to_string(),
431                "webp" => "image/webp".to_string(),
432                "bmp" => "image/bmp".to_string(),
433                _ => format!("image/{}", format),
434            },
435            StreamDataType::Audio { format, .. } => match format.to_lowercase().as_str() {
436                "pcm" => "audio/pcm".to_string(),
437                "mp3" => "audio/mpeg".to_string(),
438                "aac" => "audio/aac".to_string(),
439                "wav" => "audio/wav".to_string(),
440                "ogg" => "audio/ogg".to_string(),
441                _ => format!("audio/{}", format),
442            },
443            StreamDataType::Video { codec, .. } => match codec.to_lowercase().as_str() {
444                "h264" | "h.264" => "video/h264".to_string(),
445                "h265" | "h.265" | "hevc" => "video/h265".to_string(),
446                "vp8" => "video/vp8".to_string(),
447                "vp9" => "video/vp9".to_string(),
448                "av1" => "video/av1".to_string(),
449                _ => format!("video/{}", codec),
450            },
451            StreamDataType::Sensor { sensor_type } => {
452                format!("application/x-sensor.{}", sensor_type)
453            }
454            StreamDataType::Custom { mime_type } => mime_type.clone(),
455        }
456    }
457
458    pub fn from_mime_type(mime: &str) -> Option<Self> {
459        match mime {
460            "application/octet-stream" => Some(StreamDataType::Binary),
461            "text/plain" => Some(StreamDataType::Text),
462            "application/json" => Some(StreamDataType::Json),
463            m if m.starts_with("image/") => Some(StreamDataType::Image {
464                format: m.strip_prefix("image/")?.to_string(),
465            }),
466            m if m.starts_with("audio/") => Some(StreamDataType::Audio {
467                format: m.strip_prefix("audio/")?.to_string(),
468                sample_rate: 48000,
469                channels: 2,
470            }),
471            m if m.starts_with("video/") => Some(StreamDataType::Video {
472                codec: m.strip_prefix("video/")?.to_string(),
473                width: 1920,
474                height: 1080,
475                fps: 30,
476            }),
477            m if m.starts_with("application/x-sensor.") => Some(StreamDataType::Sensor {
478                sensor_type: m.strip_prefix("application/x-sensor.")?.to_string(),
479            }),
480            _ => Some(StreamDataType::Custom {
481                mime_type: mime.to_string(),
482            }),
483        }
484    }
485}
486
487/// Data chunk for streaming.
488#[derive(Debug, Clone, Serialize, Deserialize)]
489pub struct DataChunk {
490    pub sequence: u64,
491    pub data_type: StreamDataType,
492    pub data: Vec<u8>,
493    pub timestamp: i64,
494    #[serde(skip_serializing_if = "Option::is_none")]
495    pub metadata: Option<serde_json::Value>,
496    pub is_last: bool,
497}
498
499impl DataChunk {
500    pub fn binary(sequence: u64, data: Vec<u8>) -> Self {
501        Self {
502            sequence,
503            data_type: StreamDataType::Binary,
504            data,
505            timestamp: chrono::Utc::now().timestamp_millis(),
506            metadata: None,
507            is_last: false,
508        }
509    }
510
511    pub fn text(sequence: u64, text: String) -> Self {
512        Self {
513            sequence,
514            data_type: StreamDataType::Text,
515            data: text.into_bytes(),
516            timestamp: chrono::Utc::now().timestamp_millis(),
517            metadata: None,
518            is_last: false,
519        }
520    }
521
522    pub fn json(
523        sequence: u64,
524        value: serde_json::Value,
525    ) -> std::result::Result<Self, serde_json::Error> {
526        Ok(Self {
527            sequence,
528            data_type: StreamDataType::Json,
529            data: serde_json::to_vec(&value)?,
530            timestamp: chrono::Utc::now().timestamp_millis(),
531            metadata: None,
532            is_last: false,
533        })
534    }
535
536    pub fn image(sequence: u64, data: Vec<u8>, format: String) -> Self {
537        Self {
538            sequence,
539            data_type: StreamDataType::Image { format },
540            data,
541            timestamp: chrono::Utc::now().timestamp_millis(),
542            metadata: None,
543            is_last: false,
544        }
545    }
546
547    pub fn with_last(mut self) -> Self {
548        self.is_last = true;
549        self
550    }
551
552    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
553        self.metadata = Some(metadata);
554        self
555    }
556}
557
558/// Stream error.
559#[derive(Debug, Clone, Serialize, Deserialize)]
560pub struct StreamError {
561    pub code: String,
562    pub message: String,
563    pub retryable: bool,
564}
565
566impl StreamError {
567    /// Create a new stream error.
568    pub fn new(code: impl Into<String>, message: impl Into<String>, retryable: bool) -> Self {
569        Self {
570            code: code.into(),
571            message: message.into(),
572            retryable,
573        }
574    }
575
576    /// Create a fatal (non-retryable) error.
577    pub fn fatal(code: impl Into<String>, message: impl Into<String>) -> Self {
578        Self {
579            code: code.into(),
580            message: message.into(),
581            retryable: false,
582        }
583    }
584
585    /// Create a retryable error.
586    pub fn retryable(code: impl Into<String>, message: impl Into<String>) -> Self {
587        Self {
588            code: code.into(),
589            message: message.into(),
590            retryable: true,
591        }
592    }
593}
594
595impl std::fmt::Display for StreamError {
596    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597        write!(f, "[{}] {}", self.code, self.message)
598    }
599}
600
601/// Stream result.
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct StreamResult {
604    #[serde(skip_serializing_if = "Option::is_none")]
605    pub input_sequence: Option<u64>,
606    pub output_sequence: u64,
607    pub data: Vec<u8>,
608    pub data_type: StreamDataType,
609    pub processing_ms: f32,
610    #[serde(skip_serializing_if = "Option::is_none")]
611    pub metadata: Option<serde_json::Value>,
612    #[serde(skip_serializing_if = "Option::is_none")]
613    pub error: Option<StreamError>,
614}
615
616impl StreamResult {
617    pub fn success(
618        input_sequence: Option<u64>,
619        output_sequence: u64,
620        data: Vec<u8>,
621        data_type: StreamDataType,
622        processing_ms: f32,
623    ) -> Self {
624        Self {
625            input_sequence,
626            output_sequence,
627            data,
628            data_type,
629            processing_ms,
630            metadata: None,
631            error: None,
632        }
633    }
634
635    pub fn json(
636        input_sequence: Option<u64>,
637        output_sequence: u64,
638        value: serde_json::Value,
639        processing_ms: f32,
640    ) -> std::result::Result<Self, serde_json::Error> {
641        Ok(Self::success(
642            input_sequence,
643            output_sequence,
644            serde_json::to_vec(&value)?,
645            StreamDataType::Json,
646            processing_ms,
647        ))
648    }
649
650    /// Create an error result with minimal arguments.
651    pub fn error(input_sequence: Option<u64>, error: StreamError) -> Self {
652        Self {
653            input_sequence,
654            output_sequence: 0,
655            data: Vec::new(),
656            data_type: StreamDataType::Binary,
657            processing_ms: 0.0,
658            metadata: None,
659            error: Some(error),
660        }
661    }
662
663    /// Create an error result with full arguments.
664    pub fn error_with_details(
665        input_sequence: Option<u64>,
666        output_sequence: u64,
667        error: StreamError,
668        processing_ms: f32,
669    ) -> Self {
670        Self {
671            input_sequence,
672            output_sequence,
673            data: Vec::new(),
674            data_type: StreamDataType::Binary,
675            processing_ms,
676            metadata: None,
677            error: Some(error),
678        }
679    }
680
681    pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
682        self.metadata = Some(metadata);
683        self
684    }
685
686    /// Parse the data as JSON.
687    pub fn as_json(&self) -> std::result::Result<serde_json::Value, serde_json::Error> {
688        serde_json::from_slice(&self.data)
689    }
690
691    /// Parse the data as text.
692    pub fn as_text(&self) -> std::result::Result<&str, std::str::Utf8Error> {
693        std::str::from_utf8(&self.data)
694    }
695}
696
697/// Flow control configuration.
698#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
699pub struct FlowControl {
700    pub supports_backpressure: bool,
701    pub window_size: u32,
702    pub supports_throttling: bool,
703    pub max_rate: u32,
704}
705
706impl FlowControl {
707    pub fn default_stream() -> Self {
708        Self {
709            supports_backpressure: true,
710            window_size: 64 * 1024,
711            supports_throttling: false,
712            max_rate: 0,
713        }
714    }
715}
716
717impl Default for FlowControl {
718    fn default() -> Self {
719        Self::default_stream()
720    }
721}
722
723/// Stream capability descriptor.
724#[derive(Debug, Clone, Default, Serialize, Deserialize)]
725pub struct StreamCapability {
726    pub supported_data_types: Vec<StreamDataType>,
727    pub max_chunk_size: usize,
728    pub preferred_chunk_size: usize,
729    pub max_concurrent_sessions: usize,
730    pub mode: StreamMode,
731    pub direction: StreamDirection,
732    pub flow_control: FlowControl,
733    /// Optional JSON schema for stream configuration
734    #[serde(default)]
735    pub config_schema: Option<serde_json::Value>,
736}
737
738impl StreamCapability {
739    /// Create a push capability with common defaults.
740    pub fn push() -> Self {
741        Self {
742            supported_data_types: vec![StreamDataType::Binary],
743            max_chunk_size: 64 * 1024,
744            preferred_chunk_size: 16 * 1024,
745            max_concurrent_sessions: 5,
746            mode: StreamMode::Push,
747            direction: StreamDirection::Download,
748            flow_control: FlowControl::default(),
749            config_schema: None,
750        }
751    }
752
753    /// Create an upload capability with common defaults.
754    pub fn upload() -> Self {
755        Self {
756            supported_data_types: vec![StreamDataType::Binary],
757            max_chunk_size: 1024 * 1024,
758            preferred_chunk_size: 64 * 1024,
759            max_concurrent_sessions: 5,
760            mode: StreamMode::Stateless,
761            direction: StreamDirection::Upload,
762            flow_control: FlowControl::default(),
763            config_schema: None,
764        }
765    }
766
767    /// Create a download capability with common defaults.
768    pub fn download() -> Self {
769        Self {
770            supported_data_types: vec![StreamDataType::Binary],
771            max_chunk_size: 1024 * 1024,
772            preferred_chunk_size: 64 * 1024,
773            max_concurrent_sessions: 5,
774            mode: StreamMode::Stateless,
775            direction: StreamDirection::Download,
776            flow_control: FlowControl::default(),
777            config_schema: None,
778        }
779    }
780
781    /// Create a stateful capability with common defaults.
782    pub fn stateful() -> Self {
783        Self {
784            supported_data_types: vec![StreamDataType::Binary],
785            max_chunk_size: 1024 * 1024,
786            preferred_chunk_size: 64 * 1024,
787            max_concurrent_sessions: 5,
788            mode: StreamMode::Stateful,
789            direction: StreamDirection::Bidirectional,
790            flow_control: FlowControl::default(),
791            config_schema: None,
792        }
793    }
794
795    /// Add a supported data type.
796    pub fn with_data_type(mut self, data_type: StreamDataType) -> Self {
797        self.supported_data_types.push(data_type);
798        self
799    }
800
801    /// Set chunk size constraints.
802    pub fn with_chunk_size(mut self, preferred: usize, max: usize) -> Self {
803        self.preferred_chunk_size = preferred;
804        self.max_chunk_size = max;
805        self
806    }
807}
808
809/// Client information.
810#[derive(Debug, Clone, Serialize, Deserialize)]
811pub struct ClientInfo {
812    pub client_id: String,
813    pub ip_addr: Option<String>,
814    pub user_agent: Option<String>,
815}
816
817/// Stream session.
818#[derive(Debug, Clone, Serialize, Deserialize)]
819pub struct StreamSession {
820    pub id: String,
821    pub extension_id: String,
822    pub config: serde_json::Value,
823    pub started_at: i64,
824    pub last_activity: i64,
825    pub bytes_in: u64,
826    pub bytes_out: u64,
827    pub chunks_in: u64,
828    pub chunks_out: u64,
829    pub client_info: Option<ClientInfo>,
830    pub metadata: Option<serde_json::Value>,
831}
832
833impl StreamSession {
834    pub fn new(
835        id: String,
836        extension_id: String,
837        config: serde_json::Value,
838        client_info: ClientInfo,
839    ) -> Self {
840        let now = chrono::Utc::now().timestamp_millis();
841        Self {
842            id,
843            extension_id,
844            config,
845            started_at: now,
846            last_activity: now,
847            bytes_in: 0,
848            bytes_out: 0,
849            chunks_in: 0,
850            chunks_out: 0,
851            client_info: Some(client_info),
852            metadata: None,
853        }
854    }
855
856    /// Get the age of this session in seconds.
857    pub fn age_secs(&self) -> i64 {
858        let now = chrono::Utc::now().timestamp();
859        (now - self.started_at).max(0)
860    }
861
862    /// Get session age in milliseconds.
863    pub fn age_ms(&self) -> i64 {
864        let now = chrono::Utc::now().timestamp_millis();
865        let started_ms = self.started_at * 1000;
866        (now - started_ms).max(0)
867    }
868}
869
870/// Session statistics.
871#[derive(Debug, Clone, Serialize, Deserialize)]
872pub struct SessionStats {
873    pub input_chunks: u64,
874    pub output_chunks: u64,
875    pub input_bytes: u64,
876    pub output_bytes: u64,
877    pub errors: u64,
878    pub last_activity: i64,
879}
880
881impl Default for SessionStats {
882    fn default() -> Self {
883        Self {
884            input_chunks: 0,
885            output_chunks: 0,
886            input_bytes: 0,
887            output_bytes: 0,
888            errors: 0,
889            last_activity: chrono::Utc::now().timestamp(),
890        }
891    }
892}
893
894impl SessionStats {
895    /// Record an error, incrementing the error counter.
896    pub fn record_error(&mut self) {
897        self.errors += 1;
898        self.last_activity = chrono::Utc::now().timestamp();
899    }
900
901    /// Record input data.
902    pub fn record_input(&mut self, bytes: u64) {
903        self.input_chunks += 1;
904        self.input_bytes += bytes;
905        self.last_activity = chrono::Utc::now().timestamp();
906    }
907
908    /// Record output data.
909    pub fn record_output(&mut self, bytes: u64) {
910        self.output_chunks += 1;
911        self.output_bytes += bytes;
912        self.last_activity = chrono::Utc::now().timestamp();
913    }
914}
915
916// ============================================================================
917// Event System
918// ============================================================================
919
920/// Event filter for subscriptions.
921#[derive(Debug, Clone, Serialize, Deserialize, Default)]
922pub struct EventFilter {
923    pub source: Option<String>,
924    pub device_id: Option<String>,
925    pub extension_id: Option<String>,
926    pub agent_id: Option<String>,
927    pub rule_id: Option<String>,
928    pub workflow_id: Option<String>,
929    pub expression: Option<String>,
930}
931
932impl EventFilter {
933    pub fn new() -> Self {
934        Self::default()
935    }
936
937    pub fn by_source(mut self, source: impl Into<String>) -> Self {
938        self.source = Some(source.into());
939        self
940    }
941
942    pub fn by_device_id(mut self, device_id: impl Into<String>) -> Self {
943        self.device_id = Some(device_id.into());
944        self
945    }
946
947    pub fn by_extension_id(mut self, extension_id: impl Into<String>) -> Self {
948        self.extension_id = Some(extension_id.into());
949        self
950    }
951
952    pub fn matches(&self, _event_type: &str, event_value: &serde_json::Value) -> bool {
953        if let Some(ref source) = self.source {
954            if event_value.get("source").and_then(|v| v.as_str()) != Some(source.as_str()) {
955                return false;
956            }
957        }
958        if let Some(ref device_id) = self.device_id {
959            if event_value.get("device_id").and_then(|v| v.as_str()) != Some(device_id.as_str()) {
960                return false;
961            }
962        }
963        true
964    }
965}
966
967/// Event subscription configuration.
968#[derive(Debug, Clone, Serialize, Deserialize)]
969pub struct EventSubscription {
970    pub event_types: Vec<String>,
971    pub filters: Option<EventFilter>,
972    pub max_buffer_size: usize,
973    pub enabled: bool,
974}
975
976impl Default for EventSubscription {
977    fn default() -> Self {
978        Self {
979            event_types: Vec::new(),
980            filters: None,
981            max_buffer_size: 1000,
982            enabled: true,
983        }
984    }
985}
986
987impl EventSubscription {
988    pub fn new() -> Self {
989        Self::default()
990    }
991
992    pub fn with_types(event_types: Vec<String>) -> Self {
993        Self {
994            event_types,
995            filters: None,
996            max_buffer_size: 1000,
997            enabled: true,
998        }
999    }
1000
1001    pub fn with_filters(mut self, filters: EventFilter) -> Self {
1002        self.filters = Some(filters);
1003        self
1004    }
1005
1006    pub fn is_subscribed(&self, event_type: &str) -> bool {
1007        if !self.enabled {
1008            return false;
1009        }
1010        if self.event_types.is_empty() {
1011            return true;
1012        }
1013        self.event_types.iter().any(|et| et == event_type)
1014    }
1015}
1016
1017// ============================================================================
1018// Capability Context (for FFI)
1019// ============================================================================
1020
1021/// Native capability function types.
1022pub type NativeCapabilityInvokeFn = unsafe extern "C" fn(*const u8, usize) -> *mut c_char;
1023pub type NativeCapabilityFreeFn = unsafe extern "C" fn(*mut c_char);
1024
1025#[derive(Clone, Copy)]
1026struct NativeCapabilityBridge {
1027    invoke: NativeCapabilityInvokeFn,
1028    free: NativeCapabilityFreeFn,
1029}
1030
1031static NATIVE_CAPABILITY_BRIDGE: OnceLock<NativeCapabilityBridge> = OnceLock::new();
1032
1033/// Set the native capability bridge for FFI.
1034pub fn set_native_capability_bridge(
1035    invoke: NativeCapabilityInvokeFn,
1036    free: NativeCapabilityFreeFn,
1037) {
1038    let _ = NATIVE_CAPABILITY_BRIDGE.set(NativeCapabilityBridge { invoke, free });
1039}
1040
1041// ============================================================================
1042// Push Output Writer (for Push mode data flow)
1043// ============================================================================
1044
1045/// Function pointer type for writing push output from extension → runner.
1046/// The runner registers this callback so the extension can push data
1047/// without going through the JSON FFI round-trip.
1048pub type PushOutputWriterFn = unsafe extern "C" fn(*const u8, usize) -> i32;
1049
1050static PUSH_WRITER: OnceLock<PushOutputWriterFn> = OnceLock::new();
1051
1052/// Called by the generated FFI registration function to install the
1053/// push-output writer callback. Returns 0 on success.
1054pub fn set_push_output_writer(writer: PushOutputWriterFn) {
1055    let _ = PUSH_WRITER.set(writer);
1056}
1057
1058/// Send a push-output message from the extension to the host.
1059///
1060/// The extension calls this during Push mode to emit data chunks.
1061/// Returns `Ok(())` on success or an error if no writer is registered.
1062pub fn send_push_output(msg: &PushOutputMessage) -> crate::ipc_types::Result<()> {
1063    let writer = PUSH_WRITER.get().ok_or_else(|| {
1064        crate::ipc_types::ExtensionError::InternalError("push output writer not registered".into())
1065    })?;
1066    let json = serde_json::to_vec(msg).map_err(|e| {
1067        crate::ipc_types::ExtensionError::InternalError(format!(
1068            "failed to serialize PushOutputMessage: {}",
1069            e
1070        ))
1071    })?;
1072    let rc = unsafe { writer(json.as_ptr(), json.len()) };
1073    if rc == 0 {
1074        Ok(())
1075    } else {
1076        Err(crate::ipc_types::ExtensionError::InternalError(format!(
1077            "push_output_writer returned {}",
1078            rc
1079        )))
1080    }
1081}
1082
1083/// Block on an async future synchronously.
1084/// Only available on native targets (requires tokio runtime).
1085#[cfg(not(target_arch = "wasm32"))]
1086fn block_on_sync<F, T>(future: F) -> std::result::Result<T, CapabilityError>
1087where
1088    F: std::future::Future<Output = std::result::Result<T, CapabilityError>>,
1089{
1090    match tokio::runtime::Handle::try_current() {
1091        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(future)),
1092        Err(_) => {
1093            let runtime = tokio::runtime::Runtime::new().map_err(|e| {
1094                CapabilityError::ProviderError(format!("failed to create tokio runtime: {}", e))
1095            })?;
1096            runtime.block_on(future)
1097        }
1098    }
1099}
1100
1101/// Capability context for invoking capabilities from extensions.
1102/// Only available on native targets (requires tokio runtime).
1103#[cfg(not(target_arch = "wasm32"))]
1104#[derive(Clone)]
1105pub struct CapabilityContext {
1106    ctx: Arc<RwLock<Option<ExtensionContext>>>,
1107}
1108
1109#[cfg(not(target_arch = "wasm32"))]
1110impl Default for CapabilityContext {
1111    fn default() -> Self {
1112        Self {
1113            ctx: Arc::new(RwLock::new(None)),
1114        }
1115    }
1116}
1117
1118#[cfg(not(target_arch = "wasm32"))]
1119impl CapabilityContext {
1120    pub fn from_context(context: ExtensionContext) -> Self {
1121        Self {
1122            ctx: Arc::new(RwLock::new(Some(context))),
1123        }
1124    }
1125
1126    pub fn invoke_capability(
1127        &self,
1128        capability_name: &str,
1129        params: &serde_json::Value,
1130    ) -> serde_json::Value {
1131        let capability = match ExtensionCapability::from_name(capability_name) {
1132            Some(capability) => capability,
1133            None => {
1134                return serde_json::json!({
1135                    "success": false,
1136                    "error": format!("Unknown capability: {}", capability_name),
1137                });
1138            }
1139        };
1140
1141        let context = match block_on_sync(async {
1142            Ok::<Option<ExtensionContext>, CapabilityError>(self.ctx.read().await.clone())
1143        }) {
1144            Ok(context) => context,
1145            Err(error) => {
1146                return serde_json::json!({
1147                    "success": false,
1148                    "error": error.to_string(),
1149                });
1150            }
1151        };
1152
1153        if let Some(context) = context {
1154            return match block_on_sync(async {
1155                context.invoke_capability(capability, params).await
1156            }) {
1157                Ok(value) => value,
1158                Err(error) => serde_json::json!({
1159                    "success": false,
1160                    "error": error.to_string(),
1161                }),
1162            };
1163        }
1164
1165        let Some(bridge) = NATIVE_CAPABILITY_BRIDGE.get().copied() else {
1166            return serde_json::json!({
1167                "success": false,
1168                "error": "native capability bridge is not initialized",
1169            });
1170        };
1171
1172        let input = match serde_json::to_vec(&serde_json::json!({
1173            "capability": capability_name,
1174            "params": params,
1175        })) {
1176            Ok(input) => input,
1177            Err(error) => {
1178                return serde_json::json!({
1179                    "success": false,
1180                    "error": format!("failed to serialize capability request: {}", error),
1181                });
1182            }
1183        };
1184
1185        let ptr = unsafe { (bridge.invoke)(input.as_ptr(), input.len()) };
1186        if ptr.is_null() {
1187            return serde_json::json!({
1188                "success": false,
1189                "error": "native capability bridge returned null",
1190            });
1191        }
1192
1193        let response = unsafe { CStr::from_ptr(ptr) }.to_string_lossy().to_string();
1194        unsafe { (bridge.free)(ptr) };
1195
1196        serde_json::from_str(&response).unwrap_or_else(|error| {
1197            serde_json::json!({
1198                "success": false,
1199                "error": format!("failed to parse capability bridge response: {}", error),
1200            })
1201        })
1202    }
1203}
1204
1205// ============================================================================
1206// Extension Trait
1207// ============================================================================
1208
1209/// Core extension trait that all NeoMind extensions must implement.
1210#[async_trait]
1211pub trait Extension: Send + Sync {
1212    /// Returns the extension metadata.
1213    fn metadata(&self) -> &ExtensionMetadata;
1214
1215    /// Returns an optional descriptor with commands and metrics.
1216    fn descriptor(&self) -> Option<ExtensionDescriptor> {
1217        None
1218    }
1219
1220    /// Initialize the extension.
1221    fn init(&mut self) -> Result<()> {
1222        Ok(())
1223    }
1224
1225    /// Start the extension.
1226    fn start(&mut self) -> Result<()> {
1227        Ok(())
1228    }
1229
1230    /// Stop the extension.
1231    fn stop(&mut self) -> Result<()> {
1232        Ok(())
1233    }
1234
1235    /// Get extension status.
1236    fn status(&self) -> String {
1237        "unknown".to_string()
1238    }
1239
1240    /// Returns metrics provided by this extension.
1241    fn metrics(&self) -> Vec<MetricDescriptor> {
1242        Vec::new()
1243    }
1244
1245    /// Returns commands provided by this extension.
1246    fn commands(&self) -> Vec<CommandDescriptor> {
1247        Vec::new()
1248    }
1249
1250    /// Produce current metric values.
1251    fn produce_metrics(&self) -> Result<Vec<ExtensionMetricValue>> {
1252        Ok(Vec::new())
1253    }
1254
1255    /// Health check.
1256    async fn health_check(&self) -> Result<bool> {
1257        Ok(true)
1258    }
1259
1260    /// Configure the extension.
1261    async fn configure(&mut self, _config: &serde_json::Value) -> Result<()> {
1262        Ok(())
1263    }
1264
1265    /// Get extension statistics.
1266    fn get_stats(&self) -> ExtensionStats {
1267        ExtensionStats::default()
1268    }
1269
1270    /// Get latest output for push mode.
1271    fn latest_output(&self) -> Option<PushOutputMessage> {
1272        None
1273    }
1274
1275    /// Get stream capability.
1276    fn stream_capability(&self) -> Option<StreamCapability> {
1277        None
1278    }
1279
1280    /// Execute a command.
1281    async fn execute_command(
1282        &self,
1283        command_name: &str,
1284        args: &serde_json::Value,
1285    ) -> Result<serde_json::Value> {
1286        let _ = args;
1287        Err(ExtensionError::CommandNotFound(command_name.to_string()))
1288    }
1289
1290    /// Initialize a streaming session.
1291    async fn init_session(&self, _session: &StreamSession) -> Result<()> {
1292        Ok(())
1293    }
1294
1295    /// Process a chunk in a session.
1296    async fn process_session_chunk(
1297        &self,
1298        _session_id: &str,
1299        _chunk: DataChunk,
1300    ) -> Result<StreamResult> {
1301        Err(ExtensionError::ExecutionFailed(
1302            "Session streaming not supported".to_string(),
1303        ))
1304    }
1305
1306    /// Close a streaming session.
1307    async fn close_session(&self, _session_id: &str) -> Result<SessionStats> {
1308        Ok(SessionStats::default())
1309    }
1310
1311    /// Process a single chunk (stateless).
1312    async fn process_chunk(&self, _chunk: DataChunk) -> Result<StreamResult> {
1313        Err(ExtensionError::ExecutionFailed(
1314            "Streaming not supported".to_string(),
1315        ))
1316    }
1317
1318    /// Start push mode.
1319    async fn start_push(&self, _session_id: &str) -> Result<()> {
1320        Ok(())
1321    }
1322
1323    /// Stop push mode.
1324    async fn stop_push(&self, _session_id: &str) -> Result<()> {
1325        Ok(())
1326    }
1327
1328    /// Set output sender for push mode.
1329    /// Not available on WASM target (requires tokio).
1330    #[cfg(not(target_arch = "wasm32"))]
1331    fn set_output_sender(&self, _sender: Arc<tokio::sync::mpsc::Sender<PushOutputMessage>>) {}
1332
1333    /// Get event subscriptions.
1334    fn event_subscriptions(&self) -> &[&str] {
1335        &[]
1336    }
1337
1338    /// Handle an event.
1339    fn handle_event(&self, _event_type: &str, _payload: &serde_json::Value) -> Result<()> {
1340        Ok(())
1341    }
1342
1343    /// Called when extension is unloaded.
1344    async fn on_unload(&self) -> Result<()> {
1345        Ok(())
1346    }
1347
1348    /// Get as Any for downcasting.
1349    fn as_any(&self) -> &dyn std::any::Any;
1350}
1351
1352// ============================================================================
1353// Re-exports for compatibility
1354// ============================================================================