Skip to main content

aimdb_core/remote/
metadata.rs

1//! Record metadata types for remote introspection
2
3use core::any::TypeId;
4use serde::{Deserialize, Serialize};
5use std::string::String;
6
7use crate::graph::RecordOrigin;
8use crate::record_id::{RecordId, RecordKey};
9
10/// Metadata about a registered record type
11///
12/// Provides information for remote introspection, including buffer
13/// configuration, producer/consumer counts, and timestamps.
14///
15/// When the `metrics` feature is enabled, additional fields are included
16/// for buffer-level statistics (produced_count, consumed_count, etc.).
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RecordMetadata {
19    /// Unique record identifier (index in the storage)
20    pub record_id: u32,
21
22    /// Unique record key (stable identifier for lookup)
23    pub record_key: String,
24
25    /// Record type name (Rust type name)
26    pub name: String,
27
28    /// TypeId as hexadecimal string
29    pub type_id: String,
30
31    /// How the record gets its values (Source, Link, Transform, Passive)
32    pub origin: RecordOrigin,
33
34    /// Buffer type: "spmc_ring", "single_latest", "mailbox", or "none"
35    pub buffer_type: String,
36
37    /// Buffer capacity (None for unbounded or no buffer)
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub buffer_capacity: Option<usize>,
40
41    /// Number of registered producer services
42    pub producer_count: usize,
43
44    /// Number of registered consumer services
45    pub consumer_count: usize,
46
47    /// Whether write operations are permitted for this record
48    pub writable: bool,
49
50    /// When the record was registered (ISO 8601)
51    pub created_at: String,
52
53    /// Last update timestamp (ISO 8601), None if never updated
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub last_update: Option<String>,
56
57    /// Number of outbound connector links registered
58    pub outbound_connector_count: usize,
59
60    // ===== Buffer metrics (feature-gated) =====
61    /// Total items pushed to the buffer (metrics feature only)
62    #[cfg(feature = "metrics")]
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub produced_count: Option<u64>,
65
66    /// Total items consumed from the buffer (metrics feature only)
67    #[cfg(feature = "metrics")]
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub consumed_count: Option<u64>,
70
71    /// Total items dropped due to overflow/lag (metrics feature only)
72    #[cfg(feature = "metrics")]
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub dropped_count: Option<u64>,
75
76    /// Current buffer occupancy: (items, capacity) (metrics feature only)
77    #[cfg(feature = "metrics")]
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub occupancy: Option<(usize, usize)>,
80
81    // ===== Stage profiling (feature-gated) =====
82    /// Per-stage timing metrics (`.source()`/`.tap()`/`.link()`), if the
83    /// `profiling` feature is enabled and any stage has been registered.
84    #[cfg(feature = "profiling")]
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub stage_profiling: Option<std::vec::Vec<crate::profiling::StageProfilingInfo>>,
87}
88
89impl RecordMetadata {
90    /// Creates a new record metadata entry
91    ///
92    /// # Arguments
93    /// * `record_id` - The RecordId index
94    /// * `record_key` - The unique record key
95    /// * `type_id` - The TypeId of the record
96    /// * `name` - The Rust type name
97    /// * `origin` - How the record gets its values (Source, Link, Transform, Passive)
98    /// * `buffer_type` - Buffer type string
99    /// * `buffer_capacity` - Optional buffer capacity
100    /// * `producer_count` - Number of producers
101    /// * `consumer_count` - Number of consumers
102    /// * `writable` - Whether writes are permitted
103    /// * `created_at` - Creation timestamp (ISO 8601)
104    /// * `outbound_connector_count` - Number of outbound connectors
105    #[allow(clippy::too_many_arguments)]
106    pub fn new<K: RecordKey>(
107        record_id: RecordId,
108        record_key: K,
109        type_id: TypeId,
110        name: String,
111        origin: RecordOrigin,
112        buffer_type: String,
113        buffer_capacity: Option<usize>,
114        producer_count: usize,
115        consumer_count: usize,
116        writable: bool,
117        created_at: String,
118        outbound_connector_count: usize,
119    ) -> Self {
120        Self {
121            record_id: record_id.raw(),
122            record_key: record_key.as_str().to_string(),
123            name,
124            type_id: format!("{:?}", type_id),
125            origin,
126            buffer_type,
127            buffer_capacity,
128            producer_count,
129            consumer_count,
130            writable,
131            created_at,
132            last_update: None,
133            outbound_connector_count,
134            #[cfg(feature = "metrics")]
135            produced_count: None,
136            #[cfg(feature = "metrics")]
137            consumed_count: None,
138            #[cfg(feature = "metrics")]
139            dropped_count: None,
140            #[cfg(feature = "metrics")]
141            occupancy: None,
142            #[cfg(feature = "profiling")]
143            stage_profiling: None,
144        }
145    }
146
147    /// Sets the last update timestamp
148    pub fn with_last_update(mut self, timestamp: String) -> Self {
149        self.last_update = Some(timestamp);
150        self
151    }
152
153    /// Sets the last update timestamp from an Option
154    pub fn with_last_update_opt(mut self, timestamp: Option<String>) -> Self {
155        self.last_update = timestamp;
156        self
157    }
158
159    /// Sets buffer metrics from a snapshot (metrics feature only)
160    ///
161    /// Populates produced_count, consumed_count, dropped_count, and occupancy
162    /// from the provided metrics snapshot.
163    #[cfg(feature = "metrics")]
164    pub fn with_buffer_metrics(mut self, snapshot: crate::buffer::BufferMetricsSnapshot) -> Self {
165        self.produced_count = Some(snapshot.produced_count);
166        self.consumed_count = Some(snapshot.consumed_count);
167        self.dropped_count = Some(snapshot.dropped_count);
168        // Only include occupancy if it's meaningful (non-zero capacity)
169        if snapshot.occupancy.1 > 0 {
170            self.occupancy = Some(snapshot.occupancy);
171        }
172        self
173    }
174
175    /// Attaches a stage profiling snapshot (profiling feature only).
176    #[cfg(feature = "profiling")]
177    pub fn with_stage_profiling(
178        mut self,
179        stages: std::vec::Vec<crate::profiling::StageProfilingInfo>,
180    ) -> Self {
181        if !stages.is_empty() {
182            self.stage_profiling = Some(stages);
183        }
184        self
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::record_id::StringKey;
192
193    #[test]
194    fn test_record_metadata_creation() {
195        let type_id = TypeId::of::<i32>();
196        let metadata = RecordMetadata::new(
197            RecordId::new(0),
198            StringKey::new("test.record"),
199            type_id,
200            "i32".to_string(),
201            RecordOrigin::Source,
202            "spmc_ring".to_string(),
203            Some(100),
204            1,
205            2,
206            false,
207            "2025-10-31T10:00:00.000Z".to_string(),
208            0,
209        );
210
211        assert_eq!(metadata.record_id, 0);
212        assert_eq!(metadata.record_key, "test.record");
213        assert_eq!(metadata.name, "i32");
214        assert!(matches!(metadata.origin, RecordOrigin::Source));
215        assert_eq!(metadata.buffer_type, "spmc_ring");
216        assert_eq!(metadata.buffer_capacity, Some(100));
217        assert_eq!(metadata.producer_count, 1);
218        assert_eq!(metadata.consumer_count, 2);
219        assert_eq!(metadata.outbound_connector_count, 0);
220        assert!(!metadata.writable);
221    }
222
223    #[test]
224    fn test_record_metadata_serialization() {
225        let type_id = TypeId::of::<String>();
226        let metadata = RecordMetadata::new(
227            RecordId::new(1),
228            StringKey::new("app.config"),
229            type_id,
230            "String".to_string(),
231            RecordOrigin::Passive,
232            "single_latest".to_string(),
233            None,
234            1,
235            1,
236            true,
237            "2025-10-31T10:00:00.000Z".to_string(),
238            2,
239        )
240        .with_last_update("2025-10-31T12:00:00.000Z".to_string());
241
242        let json = serde_json::to_string(&metadata).unwrap();
243        assert!(json.contains("\"record_id\":1"));
244        assert!(json.contains("\"record_key\":\"app.config\""));
245        assert!(json.contains("\"name\":\"String\""));
246        assert!(json.contains("\"origin\":\"passive\""));
247        assert!(json.contains("\"buffer_type\":\"single_latest\""));
248        assert!(json.contains("\"writable\":true"));
249        assert!(json.contains("\"outbound_connector_count\":2"));
250    }
251}