Skip to main content

strike48_connector/
types.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3
4/// Payload encoding types (matches protobuf enum)
5#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
6#[repr(i32)]
7pub enum PayloadEncoding {
8    Unspecified = 0,
9    Json = 1,
10    RawBytes = 2,
11    ArrowIpc = 3,
12    JsonLines = 4,
13    Protobuf = 5,
14    Msgpack = 6,
15    Parquet = 7,
16}
17
18impl From<i32> for PayloadEncoding {
19    fn from(value: i32) -> Self {
20        match value {
21            1 => PayloadEncoding::Json,
22            2 => PayloadEncoding::RawBytes,
23            3 => PayloadEncoding::ArrowIpc,
24            4 => PayloadEncoding::JsonLines,
25            5 => PayloadEncoding::Protobuf,
26            6 => PayloadEncoding::Msgpack,
27            7 => PayloadEncoding::Parquet,
28            _ => PayloadEncoding::Unspecified,
29        }
30    }
31}
32
33impl From<PayloadEncoding> for i32 {
34    fn from(encoding: PayloadEncoding) -> Self {
35        encoding as i32
36    }
37}
38
39/// Connector behavior patterns
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41#[repr(i32)]
42pub enum ConnectorBehavior {
43    Unspecified = 0,
44    Source = 1,
45    Sink = 2,
46    Tool = 3,
47    Pubsub = 4,
48    RequestResponse = 5,
49    App = 6,
50}
51
52impl From<i32> for ConnectorBehavior {
53    fn from(value: i32) -> Self {
54        match value {
55            1 => ConnectorBehavior::Source,
56            2 => ConnectorBehavior::Sink,
57            3 => ConnectorBehavior::Tool,
58            4 => ConnectorBehavior::Pubsub,
59            5 => ConnectorBehavior::RequestResponse,
60            6 => ConnectorBehavior::App,
61            _ => ConnectorBehavior::Unspecified,
62        }
63    }
64}
65
66impl From<ConnectorBehavior> for i32 {
67    fn from(behavior: ConnectorBehavior) -> Self {
68        behavior as i32
69    }
70}
71
72/// Registration status
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74#[repr(i32)]
75pub enum RegistrationStatus {
76    Unspecified = 0,
77    Pending = 1,
78    Approved = 2,
79    Rejected = 3,
80}
81
82impl From<i32> for RegistrationStatus {
83    fn from(value: i32) -> Self {
84        match value {
85            1 => RegistrationStatus::Pending,
86            2 => RegistrationStatus::Approved,
87            3 => RegistrationStatus::Rejected,
88            _ => RegistrationStatus::Unspecified,
89        }
90    }
91}
92
93/// Task type schema for registering capabilities
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct TaskTypeSchema {
96    pub task_type_id: String,
97    pub name: String,
98    pub description: String,
99    pub category: String,
100    pub icon: String,
101    pub input_schema_json: String,
102    pub output_schema_json: String,
103}
104
105/// Connector capabilities
106#[derive(Debug, Clone)]
107pub struct ConnectorCapabilities {
108    pub connector_type: String,
109    pub version: String,
110    pub supported_encodings: Vec<PayloadEncoding>,
111    pub behaviors: Vec<ConnectorBehavior>,
112    pub metadata: HashMap<String, String>,
113    pub task_types: Option<Vec<TaskTypeSchema>>,
114}
115
116/// Register connector response
117#[derive(Debug, Clone)]
118pub struct RegisterConnectorResponse {
119    pub success: bool,
120    pub address: String,
121    pub connector_arn: String,
122    pub error: String,
123    pub status: RegistrationStatus,
124    pub session_token: Option<String>,
125}
126
127/// Execute request
128#[derive(Debug, Clone)]
129pub struct ExecuteRequest {
130    pub request_id: String,
131    pub payload: Vec<u8>,
132    pub payload_encoding: PayloadEncoding,
133    pub context: HashMap<String, String>,
134    pub capability_id: Option<String>,
135}
136
137/// Execute response
138#[derive(Debug, Clone)]
139pub struct ExecuteResponse {
140    pub request_id: String,
141    pub success: bool,
142    pub payload: Vec<u8>,
143    pub payload_encoding: PayloadEncoding,
144    pub error: String,
145    pub duration_ms: u64,
146}
147
148/// Connector scope - determines tenant visibility
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
150#[repr(i32)]
151pub enum ConnectorScope {
152    Unspecified = 0, // Infer from tenant_id (backward compatible)
153    Tenant = 1,      // Tenant-scoped
154    Global = 2,      // Global (accessible by all)
155}
156
157impl From<i32> for ConnectorScope {
158    fn from(value: i32) -> Self {
159        match value {
160            1 => ConnectorScope::Tenant,
161            2 => ConnectorScope::Global,
162            _ => ConnectorScope::Unspecified,
163        }
164    }
165}
166
167impl From<ConnectorScope> for i32 {
168    fn from(scope: ConnectorScope) -> Self {
169        scope as i32
170    }
171}
172
173/// Authentication error codes
174#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175#[repr(i32)]
176pub enum AuthError {
177    Unspecified = 0,
178    InvalidToken = 1,
179    ExpiredToken = 2,
180    UntrustedIssuer = 3,
181    MissingTenantId = 4,
182}
183
184impl From<i32> for AuthError {
185    fn from(value: i32) -> Self {
186        match value {
187            1 => AuthError::InvalidToken,
188            2 => AuthError::ExpiredToken,
189            3 => AuthError::UntrustedIssuer,
190            4 => AuthError::MissingTenantId,
191            _ => AuthError::Unspecified,
192        }
193    }
194}
195
196/// Invoke capability request - allows connector-to-connector invocation
197#[derive(Debug, Clone)]
198pub struct InvokeCapabilityRequest {
199    pub request_id: String,
200    pub target_address: String,
201    pub capability_id: Option<String>,
202    pub payload: Vec<u8>,
203    pub payload_encoding: PayloadEncoding,
204    pub context: HashMap<String, String>,
205    pub timeout_ms: Option<u64>,
206    pub fire_and_forget: bool,
207}
208
209/// Invoke capability response
210#[derive(Debug, Clone)]
211pub struct InvokeCapabilityResponse {
212    pub request_id: String,
213    pub success: bool,
214    pub payload: Vec<u8>,
215    pub payload_encoding: PayloadEncoding,
216    pub error: String,
217    pub duration_ms: u64,
218    pub context: Option<HashMap<String, String>>,
219    pub error_details: Option<HashMap<String, String>>,
220}
221
222/// WebSocket frame type (matches protobuf enum)
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224#[repr(i32)]
225pub enum WsFrameType {
226    Unspecified = 0,
227    Text = 1,
228    Binary = 2,
229    Ping = 3,
230    Pong = 4,
231}
232
233impl From<i32> for WsFrameType {
234    fn from(value: i32) -> Self {
235        match value {
236            1 => WsFrameType::Text,
237            2 => WsFrameType::Binary,
238            3 => WsFrameType::Ping,
239            4 => WsFrameType::Pong,
240            _ => WsFrameType::Unspecified,
241        }
242    }
243}
244
245/// Request from server to open a WebSocket connection to a backend
246#[derive(Debug, Clone)]
247pub struct WsOpenRequest {
248    pub connection_id: String,
249    pub path: String,
250    pub query_string: String,
251    pub headers: HashMap<String, String>,
252}
253
254/// Response confirming WebSocket connection was opened (or failed)
255#[derive(Debug, Clone)]
256pub struct WsOpenResponse {
257    pub connection_id: String,
258    pub success: bool,
259    pub error: String,
260}
261
262/// A WebSocket frame flowing bidirectionally
263#[derive(Debug, Clone)]
264pub struct WsFrame {
265    pub connection_id: String,
266    pub frame_type: WsFrameType,
267    pub data: Vec<u8>,
268}
269
270/// Request from server to close a WebSocket connection
271#[derive(Debug, Clone)]
272pub struct WsCloseRequest {
273    pub connection_id: String,
274    pub code: i32,
275    pub reason: String,
276}
277
278/// Event message for SOURCE connectors
279#[derive(Debug, Clone)]
280pub struct EventMessage {
281    pub event_id: String,
282    pub payload: Vec<u8>,
283    pub payload_encoding: PayloadEncoding,
284    pub timestamp: String,
285}
286
287/// Batch message for batch event sending
288#[derive(Debug, Clone)]
289pub struct BatchMessage {
290    pub batch_id: String,
291    pub events: Vec<EventMessage>,
292    pub event_count: usize,
293}
294
295/// Connector metrics
296#[derive(Debug, Clone, Default)]
297pub struct ConnectorMetrics {
298    pub requests_received: u64,
299    pub requests_processed: u64,
300    pub requests_failed: u64,
301    pub total_duration_ms: u64,
302    pub bytes_received: u64,
303    pub bytes_sent: u64,
304    /// Timestamp of last request received (Unix ms, 0 if none)
305    pub last_request_at_ms: u64,
306    // Resilience metrics
307    pub reconnection_attempts: u64,
308    pub total_disconnects: u64,
309    pub successful_reconnects: u64,
310    pub last_disconnect_reason: Option<String>,
311    pub last_connected_at_ms: Option<u64>,
312    pub last_disconnected_at_ms: Option<u64>,
313    pub current_backoff_ms: u64,
314    // Heartbeat RTT metrics
315    pub heartbeat_rtt_total_ms: f64,
316    pub heartbeat_rtt_count: u64,
317    pub heartbeat_rtt_last_ms: f64,
318    pub heartbeat_rtt_min_ms: f64,
319    pub heartbeat_rtt_max_ms: f64,
320}
321
322impl ConnectorMetrics {
323    /// Record a heartbeat round-trip time measurement.
324    pub fn record_heartbeat_rtt(&mut self, rtt_ms: f64) {
325        self.heartbeat_rtt_total_ms += rtt_ms;
326        self.heartbeat_rtt_count += 1;
327        self.heartbeat_rtt_last_ms = rtt_ms;
328        if self.heartbeat_rtt_count == 1 || rtt_ms < self.heartbeat_rtt_min_ms {
329            self.heartbeat_rtt_min_ms = rtt_ms;
330        }
331        if rtt_ms > self.heartbeat_rtt_max_ms {
332            self.heartbeat_rtt_max_ms = rtt_ms;
333        }
334    }
335
336    /// Average heartbeat RTT in milliseconds, or 0.0 if no samples.
337    pub fn heartbeat_rtt_avg_ms(&self) -> f64 {
338        if self.heartbeat_rtt_count == 0 {
339            0.0
340        } else {
341            self.heartbeat_rtt_total_ms / self.heartbeat_rtt_count as f64
342        }
343    }
344
345    /// Collect SDK-level metrics for inclusion in `MetricsReport.custom_metrics`.
346    /// All keys are `sdk.*` prefixed to avoid collision with connector-defined metrics.
347    pub fn sdk_custom_metrics(&self) -> std::collections::HashMap<String, f64> {
348        let mut m = crate::system_metrics::collect();
349
350        if self.heartbeat_rtt_count > 0 {
351            m.insert(
352                "sdk.heartbeat_rtt_avg_ms".to_string(),
353                self.heartbeat_rtt_avg_ms(),
354            );
355            m.insert(
356                "sdk.heartbeat_rtt_last_ms".to_string(),
357                self.heartbeat_rtt_last_ms,
358            );
359            m.insert(
360                "sdk.heartbeat_rtt_min_ms".to_string(),
361                self.heartbeat_rtt_min_ms,
362            );
363            m.insert(
364                "sdk.heartbeat_rtt_max_ms".to_string(),
365                self.heartbeat_rtt_max_ms,
366            );
367            m.insert(
368                "sdk.heartbeat_rtt_count".to_string(),
369                self.heartbeat_rtt_count as f64,
370            );
371        }
372        m
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[test]
381    fn test_record_single_rtt() {
382        let mut m = ConnectorMetrics::default();
383        m.record_heartbeat_rtt(15.0);
384
385        assert_eq!(m.heartbeat_rtt_count, 1);
386        assert_eq!(m.heartbeat_rtt_last_ms, 15.0);
387        assert_eq!(m.heartbeat_rtt_min_ms, 15.0);
388        assert_eq!(m.heartbeat_rtt_max_ms, 15.0);
389        assert_eq!(m.heartbeat_rtt_avg_ms(), 15.0);
390    }
391
392    #[test]
393    fn test_record_multiple_rtts() {
394        let mut m = ConnectorMetrics::default();
395        m.record_heartbeat_rtt(10.0);
396        m.record_heartbeat_rtt(20.0);
397        m.record_heartbeat_rtt(30.0);
398
399        assert_eq!(m.heartbeat_rtt_count, 3);
400        assert_eq!(m.heartbeat_rtt_last_ms, 30.0);
401        assert_eq!(m.heartbeat_rtt_min_ms, 10.0);
402        assert_eq!(m.heartbeat_rtt_max_ms, 30.0);
403        assert_eq!(m.heartbeat_rtt_avg_ms(), 20.0);
404    }
405
406    #[test]
407    fn test_rtt_avg_zero_when_no_samples() {
408        let m = ConnectorMetrics::default();
409        assert_eq!(m.heartbeat_rtt_avg_ms(), 0.0);
410        assert_eq!(m.heartbeat_rtt_count, 0);
411    }
412
413    #[test]
414    fn test_rtt_min_max_with_decreasing_values() {
415        let mut m = ConnectorMetrics::default();
416        m.record_heartbeat_rtt(50.0);
417        m.record_heartbeat_rtt(5.0);
418
419        assert_eq!(m.heartbeat_rtt_min_ms, 5.0);
420        assert_eq!(m.heartbeat_rtt_max_ms, 50.0);
421    }
422
423    #[test]
424    fn test_sdk_custom_metrics_has_system_metrics_without_rtts() {
425        let m = ConnectorMetrics::default();
426        let custom = m.sdk_custom_metrics();
427        assert!(
428            custom.contains_key("sdk.system_total_memory_bytes"),
429            "system metrics should always be present"
430        );
431        assert!(
432            !custom.contains_key("sdk.heartbeat_rtt_avg_ms"),
433            "RTT metrics should be absent when no samples"
434        );
435    }
436
437    #[test]
438    fn test_sdk_custom_metrics_includes_rtt_after_samples() {
439        let mut m = ConnectorMetrics::default();
440        m.record_heartbeat_rtt(10.0);
441        m.record_heartbeat_rtt(20.0);
442
443        let custom = m.sdk_custom_metrics();
444        assert_eq!(custom["sdk.heartbeat_rtt_avg_ms"], 15.0);
445        assert_eq!(custom["sdk.heartbeat_rtt_last_ms"], 20.0);
446        assert_eq!(custom["sdk.heartbeat_rtt_min_ms"], 10.0);
447        assert_eq!(custom["sdk.heartbeat_rtt_max_ms"], 20.0);
448        assert_eq!(custom["sdk.heartbeat_rtt_count"], 2.0);
449        assert!(
450            custom.contains_key("sdk.system_total_memory_bytes"),
451            "system metrics should also be present"
452        );
453    }
454
455    #[test]
456    fn test_sdk_custom_metrics_all_keys_prefixed() {
457        let mut m = ConnectorMetrics::default();
458        m.record_heartbeat_rtt(5.0);
459
460        let custom = m.sdk_custom_metrics();
461        for key in custom.keys() {
462            assert!(key.starts_with("sdk."), "key {key} must have sdk. prefix");
463        }
464    }
465}