1use core::any::TypeId;
4use serde::{Deserialize, Serialize};
5use std::string::String;
6
7use crate::graph::RecordOrigin;
8use crate::record_id::{RecordId, RecordKey};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct RecordMetadata {
19 pub record_id: u32,
21
22 pub record_key: String,
24
25 pub name: String,
27
28 pub type_id: String,
30
31 pub origin: RecordOrigin,
33
34 pub buffer_type: String,
36
37 #[serde(skip_serializing_if = "Option::is_none")]
39 pub buffer_capacity: Option<usize>,
40
41 pub producer_count: usize,
43
44 pub consumer_count: usize,
46
47 pub writable: bool,
49
50 pub created_at: String,
52
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub last_update: Option<String>,
56
57 pub outbound_connector_count: usize,
59
60 #[cfg(feature = "metrics")]
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub produced_count: Option<u64>,
65
66 #[cfg(feature = "metrics")]
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub consumed_count: Option<u64>,
70
71 #[cfg(feature = "metrics")]
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub dropped_count: Option<u64>,
75
76 #[cfg(feature = "metrics")]
78 #[serde(skip_serializing_if = "Option::is_none")]
79 pub occupancy: Option<(usize, usize)>,
80}
81
82impl RecordMetadata {
83 #[allow(clippy::too_many_arguments)]
99 pub fn new<K: RecordKey>(
100 record_id: RecordId,
101 record_key: K,
102 type_id: TypeId,
103 name: String,
104 origin: RecordOrigin,
105 buffer_type: String,
106 buffer_capacity: Option<usize>,
107 producer_count: usize,
108 consumer_count: usize,
109 writable: bool,
110 created_at: String,
111 outbound_connector_count: usize,
112 ) -> Self {
113 Self {
114 record_id: record_id.raw(),
115 record_key: record_key.as_str().to_string(),
116 name,
117 type_id: format!("{:?}", type_id),
118 origin,
119 buffer_type,
120 buffer_capacity,
121 producer_count,
122 consumer_count,
123 writable,
124 created_at,
125 last_update: None,
126 outbound_connector_count,
127 #[cfg(feature = "metrics")]
128 produced_count: None,
129 #[cfg(feature = "metrics")]
130 consumed_count: None,
131 #[cfg(feature = "metrics")]
132 dropped_count: None,
133 #[cfg(feature = "metrics")]
134 occupancy: None,
135 }
136 }
137
138 pub fn with_last_update(mut self, timestamp: String) -> Self {
140 self.last_update = Some(timestamp);
141 self
142 }
143
144 pub fn with_last_update_opt(mut self, timestamp: Option<String>) -> Self {
146 self.last_update = timestamp;
147 self
148 }
149
150 #[cfg(feature = "metrics")]
155 pub fn with_buffer_metrics(mut self, snapshot: crate::buffer::BufferMetricsSnapshot) -> Self {
156 self.produced_count = Some(snapshot.produced_count);
157 self.consumed_count = Some(snapshot.consumed_count);
158 self.dropped_count = Some(snapshot.dropped_count);
159 if snapshot.occupancy.1 > 0 {
161 self.occupancy = Some(snapshot.occupancy);
162 }
163 self
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::record_id::StringKey;
171
172 #[test]
173 fn test_record_metadata_creation() {
174 let type_id = TypeId::of::<i32>();
175 let metadata = RecordMetadata::new(
176 RecordId::new(0),
177 StringKey::new("test.record"),
178 type_id,
179 "i32".to_string(),
180 RecordOrigin::Source,
181 "spmc_ring".to_string(),
182 Some(100),
183 1,
184 2,
185 false,
186 "2025-10-31T10:00:00.000Z".to_string(),
187 0,
188 );
189
190 assert_eq!(metadata.record_id, 0);
191 assert_eq!(metadata.record_key, "test.record");
192 assert_eq!(metadata.name, "i32");
193 assert!(matches!(metadata.origin, RecordOrigin::Source));
194 assert_eq!(metadata.buffer_type, "spmc_ring");
195 assert_eq!(metadata.buffer_capacity, Some(100));
196 assert_eq!(metadata.producer_count, 1);
197 assert_eq!(metadata.consumer_count, 2);
198 assert_eq!(metadata.outbound_connector_count, 0);
199 assert!(!metadata.writable);
200 }
201
202 #[test]
203 fn test_record_metadata_serialization() {
204 let type_id = TypeId::of::<String>();
205 let metadata = RecordMetadata::new(
206 RecordId::new(1),
207 StringKey::new("app.config"),
208 type_id,
209 "String".to_string(),
210 RecordOrigin::Passive,
211 "single_latest".to_string(),
212 None,
213 1,
214 1,
215 true,
216 "2025-10-31T10:00:00.000Z".to_string(),
217 2,
218 )
219 .with_last_update("2025-10-31T12:00:00.000Z".to_string());
220
221 let json = serde_json::to_string(&metadata).unwrap();
222 assert!(json.contains("\"record_id\":1"));
223 assert!(json.contains("\"record_key\":\"app.config\""));
224 assert!(json.contains("\"name\":\"String\""));
225 assert!(json.contains("\"origin\":\"passive\""));
226 assert!(json.contains("\"buffer_type\":\"single_latest\""));
227 assert!(json.contains("\"writable\":true"));
228 assert!(json.contains("\"outbound_connector_count\":2"));
229 }
230}