1use core::any::TypeId;
4use serde::{Deserialize, Serialize};
5use std::string::String;
6
7use crate::graph::RecordOrigin;
8use crate::record_id::{RecordId, RecordKey};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RecordMetadata {
19 pub record_id: u32,
21
22 pub record_key: String,
24
25 pub name: String,
27
28 pub type_id: String,
30
31 pub origin: RecordOrigin,
33
34 pub buffer_type: String,
36
37 #[serde(skip_serializing_if = "Option::is_none")]
39 pub buffer_capacity: Option<usize>,
40
41 pub producer_count: usize,
43
44 pub consumer_count: usize,
46
47 pub writable: bool,
49
50 pub created_at: String,
52
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub last_update: Option<String>,
56
57 pub outbound_connector_count: usize,
59
60 #[cfg(feature = "metrics")]
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub produced_count: Option<u64>,
65
66 #[cfg(feature = "metrics")]
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub consumed_count: Option<u64>,
70
71 #[cfg(feature = "metrics")]
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub dropped_count: Option<u64>,
75
76 #[cfg(feature = "metrics")]
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub occupancy: Option<(usize, usize)>,
80
81 #[cfg(feature = "profiling")]
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub stage_profiling: Option<std::vec::Vec<crate::profiling::StageProfilingInfo>>,
87}
88
89impl RecordMetadata {
90 #[allow(clippy::too_many_arguments)]
106 pub fn new<K: RecordKey>(
107 record_id: RecordId,
108 record_key: K,
109 type_id: TypeId,
110 name: String,
111 origin: RecordOrigin,
112 buffer_type: String,
113 buffer_capacity: Option<usize>,
114 producer_count: usize,
115 consumer_count: usize,
116 writable: bool,
117 created_at: String,
118 outbound_connector_count: usize,
119 ) -> Self {
120 Self {
121 record_id: record_id.raw(),
122 record_key: record_key.as_str().to_string(),
123 name,
124 type_id: format!("{:?}", type_id),
125 origin,
126 buffer_type,
127 buffer_capacity,
128 producer_count,
129 consumer_count,
130 writable,
131 created_at,
132 last_update: None,
133 outbound_connector_count,
134 #[cfg(feature = "metrics")]
135 produced_count: None,
136 #[cfg(feature = "metrics")]
137 consumed_count: None,
138 #[cfg(feature = "metrics")]
139 dropped_count: None,
140 #[cfg(feature = "metrics")]
141 occupancy: None,
142 #[cfg(feature = "profiling")]
143 stage_profiling: None,
144 }
145 }
146
147 pub fn with_last_update(mut self, timestamp: String) -> Self {
149 self.last_update = Some(timestamp);
150 self
151 }
152
153 pub fn with_last_update_opt(mut self, timestamp: Option<String>) -> Self {
155 self.last_update = timestamp;
156 self
157 }
158
159 #[cfg(feature = "metrics")]
164 pub fn with_buffer_metrics(mut self, snapshot: crate::buffer::BufferMetricsSnapshot) -> Self {
165 self.produced_count = Some(snapshot.produced_count);
166 self.consumed_count = Some(snapshot.consumed_count);
167 self.dropped_count = Some(snapshot.dropped_count);
168 if snapshot.occupancy.1 > 0 {
170 self.occupancy = Some(snapshot.occupancy);
171 }
172 self
173 }
174
175 #[cfg(feature = "profiling")]
177 pub fn with_stage_profiling(
178 mut self,
179 stages: std::vec::Vec<crate::profiling::StageProfilingInfo>,
180 ) -> Self {
181 if !stages.is_empty() {
182 self.stage_profiling = Some(stages);
183 }
184 self
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::record_id::StringKey;
192
193 #[test]
194 fn test_record_metadata_creation() {
195 let type_id = TypeId::of::<i32>();
196 let metadata = RecordMetadata::new(
197 RecordId::new(0),
198 StringKey::new("test.record"),
199 type_id,
200 "i32".to_string(),
201 RecordOrigin::Source,
202 "spmc_ring".to_string(),
203 Some(100),
204 1,
205 2,
206 false,
207 "2025-10-31T10:00:00.000Z".to_string(),
208 0,
209 );
210
211 assert_eq!(metadata.record_id, 0);
212 assert_eq!(metadata.record_key, "test.record");
213 assert_eq!(metadata.name, "i32");
214 assert!(matches!(metadata.origin, RecordOrigin::Source));
215 assert_eq!(metadata.buffer_type, "spmc_ring");
216 assert_eq!(metadata.buffer_capacity, Some(100));
217 assert_eq!(metadata.producer_count, 1);
218 assert_eq!(metadata.consumer_count, 2);
219 assert_eq!(metadata.outbound_connector_count, 0);
220 assert!(!metadata.writable);
221 }
222
223 #[test]
224 fn test_record_metadata_serialization() {
225 let type_id = TypeId::of::<String>();
226 let metadata = RecordMetadata::new(
227 RecordId::new(1),
228 StringKey::new("app.config"),
229 type_id,
230 "String".to_string(),
231 RecordOrigin::Passive,
232 "single_latest".to_string(),
233 None,
234 1,
235 1,
236 true,
237 "2025-10-31T10:00:00.000Z".to_string(),
238 2,
239 )
240 .with_last_update("2025-10-31T12:00:00.000Z".to_string());
241
242 let json = serde_json::to_string(&metadata).unwrap();
243 assert!(json.contains("\"record_id\":1"));
244 assert!(json.contains("\"record_key\":\"app.config\""));
245 assert!(json.contains("\"name\":\"String\""));
246 assert!(json.contains("\"origin\":\"passive\""));
247 assert!(json.contains("\"buffer_type\":\"single_latest\""));
248 assert!(json.contains("\"writable\":true"));
249 assert!(json.contains("\"outbound_connector_count\":2"));
250 }
251}