1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3
4#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
6#[repr(i32)]
7pub enum PayloadEncoding {
8 Unspecified = 0,
9 Json = 1,
10 RawBytes = 2,
11 ArrowIpc = 3,
12 JsonLines = 4,
13 Protobuf = 5,
14 Msgpack = 6,
15 Parquet = 7,
16}
17
18impl From<i32> for PayloadEncoding {
19 fn from(value: i32) -> Self {
20 match value {
21 1 => PayloadEncoding::Json,
22 2 => PayloadEncoding::RawBytes,
23 3 => PayloadEncoding::ArrowIpc,
24 4 => PayloadEncoding::JsonLines,
25 5 => PayloadEncoding::Protobuf,
26 6 => PayloadEncoding::Msgpack,
27 7 => PayloadEncoding::Parquet,
28 _ => PayloadEncoding::Unspecified,
29 }
30 }
31}
32
33impl From<PayloadEncoding> for i32 {
34 fn from(encoding: PayloadEncoding) -> Self {
35 encoding as i32
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41#[repr(i32)]
42pub enum ConnectorBehavior {
43 Unspecified = 0,
44 Source = 1,
45 Sink = 2,
46 Tool = 3,
47 Pubsub = 4,
48 RequestResponse = 5,
49 App = 6,
50}
51
52impl From<i32> for ConnectorBehavior {
53 fn from(value: i32) -> Self {
54 match value {
55 1 => ConnectorBehavior::Source,
56 2 => ConnectorBehavior::Sink,
57 3 => ConnectorBehavior::Tool,
58 4 => ConnectorBehavior::Pubsub,
59 5 => ConnectorBehavior::RequestResponse,
60 6 => ConnectorBehavior::App,
61 _ => ConnectorBehavior::Unspecified,
62 }
63 }
64}
65
66impl From<ConnectorBehavior> for i32 {
67 fn from(behavior: ConnectorBehavior) -> Self {
68 behavior as i32
69 }
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74#[repr(i32)]
75pub enum RegistrationStatus {
76 Unspecified = 0,
77 Pending = 1,
78 Approved = 2,
79 Rejected = 3,
80}
81
82impl From<i32> for RegistrationStatus {
83 fn from(value: i32) -> Self {
84 match value {
85 1 => RegistrationStatus::Pending,
86 2 => RegistrationStatus::Approved,
87 3 => RegistrationStatus::Rejected,
88 _ => RegistrationStatus::Unspecified,
89 }
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct TaskTypeSchema {
96 pub task_type_id: String,
97 pub name: String,
98 pub description: String,
99 pub category: String,
100 pub icon: String,
101 pub input_schema_json: String,
102 pub output_schema_json: String,
103}
104
105#[derive(Debug, Clone)]
107pub struct ConnectorCapabilities {
108 pub connector_type: String,
109 pub version: String,
110 pub supported_encodings: Vec<PayloadEncoding>,
111 pub behaviors: Vec<ConnectorBehavior>,
112 pub metadata: HashMap<String, String>,
113 pub task_types: Option<Vec<TaskTypeSchema>>,
114}
115
116#[derive(Debug, Clone)]
118pub struct RegisterConnectorResponse {
119 pub success: bool,
120 pub address: String,
121 pub connector_arn: String,
122 pub error: String,
123 pub status: RegistrationStatus,
124 pub session_token: Option<String>,
125}
126
127#[derive(Debug, Clone)]
129pub struct ExecuteRequest {
130 pub request_id: String,
131 pub payload: Vec<u8>,
132 pub payload_encoding: PayloadEncoding,
133 pub context: HashMap<String, String>,
134 pub capability_id: Option<String>,
135}
136
137#[derive(Debug, Clone)]
139pub struct ExecuteResponse {
140 pub request_id: String,
141 pub success: bool,
142 pub payload: Vec<u8>,
143 pub payload_encoding: PayloadEncoding,
144 pub error: String,
145 pub duration_ms: u64,
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
150#[repr(i32)]
151pub enum ConnectorScope {
152 Unspecified = 0, Tenant = 1, Global = 2, }
156
157impl From<i32> for ConnectorScope {
158 fn from(value: i32) -> Self {
159 match value {
160 1 => ConnectorScope::Tenant,
161 2 => ConnectorScope::Global,
162 _ => ConnectorScope::Unspecified,
163 }
164 }
165}
166
167impl From<ConnectorScope> for i32 {
168 fn from(scope: ConnectorScope) -> Self {
169 scope as i32
170 }
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175#[repr(i32)]
176pub enum AuthError {
177 Unspecified = 0,
178 InvalidToken = 1,
179 ExpiredToken = 2,
180 UntrustedIssuer = 3,
181 MissingTenantId = 4,
182}
183
184impl From<i32> for AuthError {
185 fn from(value: i32) -> Self {
186 match value {
187 1 => AuthError::InvalidToken,
188 2 => AuthError::ExpiredToken,
189 3 => AuthError::UntrustedIssuer,
190 4 => AuthError::MissingTenantId,
191 _ => AuthError::Unspecified,
192 }
193 }
194}
195
196#[derive(Debug, Clone)]
198pub struct InvokeCapabilityRequest {
199 pub request_id: String,
200 pub target_address: String,
201 pub capability_id: Option<String>,
202 pub payload: Vec<u8>,
203 pub payload_encoding: PayloadEncoding,
204 pub context: HashMap<String, String>,
205 pub timeout_ms: Option<u64>,
206 pub fire_and_forget: bool,
207}
208
209#[derive(Debug, Clone)]
211pub struct InvokeCapabilityResponse {
212 pub request_id: String,
213 pub success: bool,
214 pub payload: Vec<u8>,
215 pub payload_encoding: PayloadEncoding,
216 pub error: String,
217 pub duration_ms: u64,
218 pub context: Option<HashMap<String, String>>,
219 pub error_details: Option<HashMap<String, String>>,
220}
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224#[repr(i32)]
225pub enum WsFrameType {
226 Unspecified = 0,
227 Text = 1,
228 Binary = 2,
229 Ping = 3,
230 Pong = 4,
231}
232
233impl From<i32> for WsFrameType {
234 fn from(value: i32) -> Self {
235 match value {
236 1 => WsFrameType::Text,
237 2 => WsFrameType::Binary,
238 3 => WsFrameType::Ping,
239 4 => WsFrameType::Pong,
240 _ => WsFrameType::Unspecified,
241 }
242 }
243}
244
245#[derive(Debug, Clone)]
247pub struct WsOpenRequest {
248 pub connection_id: String,
249 pub path: String,
250 pub query_string: String,
251 pub headers: HashMap<String, String>,
252}
253
254#[derive(Debug, Clone)]
256pub struct WsOpenResponse {
257 pub connection_id: String,
258 pub success: bool,
259 pub error: String,
260}
261
262#[derive(Debug, Clone)]
264pub struct WsFrame {
265 pub connection_id: String,
266 pub frame_type: WsFrameType,
267 pub data: Vec<u8>,
268}
269
270#[derive(Debug, Clone)]
272pub struct WsCloseRequest {
273 pub connection_id: String,
274 pub code: i32,
275 pub reason: String,
276}
277
278#[derive(Debug, Clone)]
280pub struct EventMessage {
281 pub event_id: String,
282 pub payload: Vec<u8>,
283 pub payload_encoding: PayloadEncoding,
284 pub timestamp: String,
285}
286
287#[derive(Debug, Clone)]
289pub struct BatchMessage {
290 pub batch_id: String,
291 pub events: Vec<EventMessage>,
292 pub event_count: usize,
293}
294
295#[derive(Debug, Clone, Default)]
297pub struct ConnectorMetrics {
298 pub requests_received: u64,
299 pub requests_processed: u64,
300 pub requests_failed: u64,
301 pub total_duration_ms: u64,
302 pub bytes_received: u64,
303 pub bytes_sent: u64,
304 pub last_request_at_ms: u64,
306 pub reconnection_attempts: u64,
308 pub total_disconnects: u64,
309 pub successful_reconnects: u64,
310 pub last_disconnect_reason: Option<String>,
311 pub last_connected_at_ms: Option<u64>,
312 pub last_disconnected_at_ms: Option<u64>,
313 pub current_backoff_ms: u64,
314 pub heartbeat_rtt_total_ms: f64,
316 pub heartbeat_rtt_count: u64,
317 pub heartbeat_rtt_last_ms: f64,
318 pub heartbeat_rtt_min_ms: f64,
319 pub heartbeat_rtt_max_ms: f64,
320}
321
322impl ConnectorMetrics {
323 pub fn record_heartbeat_rtt(&mut self, rtt_ms: f64) {
325 self.heartbeat_rtt_total_ms += rtt_ms;
326 self.heartbeat_rtt_count += 1;
327 self.heartbeat_rtt_last_ms = rtt_ms;
328 if self.heartbeat_rtt_count == 1 || rtt_ms < self.heartbeat_rtt_min_ms {
329 self.heartbeat_rtt_min_ms = rtt_ms;
330 }
331 if rtt_ms > self.heartbeat_rtt_max_ms {
332 self.heartbeat_rtt_max_ms = rtt_ms;
333 }
334 }
335
336 pub fn heartbeat_rtt_avg_ms(&self) -> f64 {
338 if self.heartbeat_rtt_count == 0 {
339 0.0
340 } else {
341 self.heartbeat_rtt_total_ms / self.heartbeat_rtt_count as f64
342 }
343 }
344
345 pub fn sdk_custom_metrics(&self) -> std::collections::HashMap<String, f64> {
348 let mut m = crate::system_metrics::collect();
349
350 if self.heartbeat_rtt_count > 0 {
351 m.insert(
352 "sdk.heartbeat_rtt_avg_ms".to_string(),
353 self.heartbeat_rtt_avg_ms(),
354 );
355 m.insert(
356 "sdk.heartbeat_rtt_last_ms".to_string(),
357 self.heartbeat_rtt_last_ms,
358 );
359 m.insert(
360 "sdk.heartbeat_rtt_min_ms".to_string(),
361 self.heartbeat_rtt_min_ms,
362 );
363 m.insert(
364 "sdk.heartbeat_rtt_max_ms".to_string(),
365 self.heartbeat_rtt_max_ms,
366 );
367 m.insert(
368 "sdk.heartbeat_rtt_count".to_string(),
369 self.heartbeat_rtt_count as f64,
370 );
371 }
372 m
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[test]
381 fn test_record_single_rtt() {
382 let mut m = ConnectorMetrics::default();
383 m.record_heartbeat_rtt(15.0);
384
385 assert_eq!(m.heartbeat_rtt_count, 1);
386 assert_eq!(m.heartbeat_rtt_last_ms, 15.0);
387 assert_eq!(m.heartbeat_rtt_min_ms, 15.0);
388 assert_eq!(m.heartbeat_rtt_max_ms, 15.0);
389 assert_eq!(m.heartbeat_rtt_avg_ms(), 15.0);
390 }
391
392 #[test]
393 fn test_record_multiple_rtts() {
394 let mut m = ConnectorMetrics::default();
395 m.record_heartbeat_rtt(10.0);
396 m.record_heartbeat_rtt(20.0);
397 m.record_heartbeat_rtt(30.0);
398
399 assert_eq!(m.heartbeat_rtt_count, 3);
400 assert_eq!(m.heartbeat_rtt_last_ms, 30.0);
401 assert_eq!(m.heartbeat_rtt_min_ms, 10.0);
402 assert_eq!(m.heartbeat_rtt_max_ms, 30.0);
403 assert_eq!(m.heartbeat_rtt_avg_ms(), 20.0);
404 }
405
406 #[test]
407 fn test_rtt_avg_zero_when_no_samples() {
408 let m = ConnectorMetrics::default();
409 assert_eq!(m.heartbeat_rtt_avg_ms(), 0.0);
410 assert_eq!(m.heartbeat_rtt_count, 0);
411 }
412
413 #[test]
414 fn test_rtt_min_max_with_decreasing_values() {
415 let mut m = ConnectorMetrics::default();
416 m.record_heartbeat_rtt(50.0);
417 m.record_heartbeat_rtt(5.0);
418
419 assert_eq!(m.heartbeat_rtt_min_ms, 5.0);
420 assert_eq!(m.heartbeat_rtt_max_ms, 50.0);
421 }
422
423 #[test]
424 fn test_sdk_custom_metrics_has_system_metrics_without_rtts() {
425 let m = ConnectorMetrics::default();
426 let custom = m.sdk_custom_metrics();
427 assert!(
428 custom.contains_key("sdk.system_total_memory_bytes"),
429 "system metrics should always be present"
430 );
431 assert!(
432 !custom.contains_key("sdk.heartbeat_rtt_avg_ms"),
433 "RTT metrics should be absent when no samples"
434 );
435 }
436
437 #[test]
438 fn test_sdk_custom_metrics_includes_rtt_after_samples() {
439 let mut m = ConnectorMetrics::default();
440 m.record_heartbeat_rtt(10.0);
441 m.record_heartbeat_rtt(20.0);
442
443 let custom = m.sdk_custom_metrics();
444 assert_eq!(custom["sdk.heartbeat_rtt_avg_ms"], 15.0);
445 assert_eq!(custom["sdk.heartbeat_rtt_last_ms"], 20.0);
446 assert_eq!(custom["sdk.heartbeat_rtt_min_ms"], 10.0);
447 assert_eq!(custom["sdk.heartbeat_rtt_max_ms"], 20.0);
448 assert_eq!(custom["sdk.heartbeat_rtt_count"], 2.0);
449 assert!(
450 custom.contains_key("sdk.system_total_memory_bytes"),
451 "system metrics should also be present"
452 );
453 }
454
455 #[test]
456 fn test_sdk_custom_metrics_all_keys_prefixed() {
457 let mut m = ConnectorMetrics::default();
458 m.record_heartbeat_rtt(5.0);
459
460 let custom = m.sdk_custom_metrics();
461 for key in custom.keys() {
462 assert!(key.starts_with("sdk."), "key {key} must have sdk. prefix");
463 }
464 }
465}