1use std::{borrow::Cow, fmt::Display, str::FromStr};
24
25use cfg_if::cfg_if;
26use serde::Serialize;
27
28use crate::{
29 error::Error,
30 interface::{name::InterfaceName, version::InterfaceVersion, MappingVec, Retention, Schema},
31 mapping::{
32 datastream::object::DatastreamObjectMapping, path::MappingPath, InterfaceMapping,
33 MappingError,
34 },
35 schema::{Aggregation, InterfaceJson, InterfaceType, Mapping, Ownership, Reliability},
36};
37
38#[derive(Debug, thiserror::Error)]
40pub enum ObjectError {
41 #[error("object has a different {ctx} for the mapping {endpoint}")]
43 Mapping {
44 ctx: &'static str,
46 endpoint: String,
48 },
49 #[error("object has an inconsistent endpoint {endpoint}")]
55 Endpoint {
56 endpoint: String,
58 },
59}
60
61impl ObjectError {
62 fn mapping(ctx: &'static str, endpoint: impl AsRef<str>) -> Self {
63 Self::Mapping {
64 ctx,
65 endpoint: endpoint.as_ref().to_string(),
66 }
67 }
68}
69
70#[derive(Debug, PartialEq, Eq, Clone)]
74pub struct DatastreamObject {
75 name: InterfaceName,
76 version: InterfaceVersion,
77 ownership: Ownership,
78 reliability: Reliability,
79 explicit_timestamp: bool,
80 retention: Retention,
81 mappings: MappingVec<DatastreamObjectMapping>,
82 #[cfg(feature = "server-fields")]
83 #[cfg_attr(docsrs, doc(cfg(feature = "server-fields")))]
84 database_retention: crate::interface::DatabaseRetention,
85 #[cfg(feature = "doc-fields")]
86 #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
87 description: Option<String>,
88 #[cfg(feature = "doc-fields")]
89 #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
90 doc: Option<String>,
91}
92
93impl DatastreamObject {
94 #[must_use]
96 pub fn reliability(&self) -> Reliability {
97 self.reliability
98 }
99
100 #[must_use]
104 pub fn explicit_timestamp(&self) -> bool {
105 self.explicit_timestamp
106 }
107
108 #[must_use]
110 pub fn retention(&self) -> Retention {
111 self.retention
112 }
113 #[cfg(feature = "server-fields")]
115 #[cfg_attr(docsrs, doc(cfg(feature = "server-fields")))]
116 #[must_use]
117 pub fn database_retention(&self) -> crate::interface::DatabaseRetention {
118 self.database_retention
119 }
120
121 #[must_use]
123 pub fn is_object_path(&self, path: &MappingPath<'_>) -> bool {
124 let Some(mapping) = self.mappings.iter().next() else {
125 unreachable!("objects must have at least one mapping")
126 };
127
128 mapping.is_object_path(path)
129 }
130
131 #[must_use]
135 pub fn mapping(&self, path: &str) -> Option<&DatastreamObjectMapping> {
136 self.mappings
137 .iter()
138 .find(|mapping| mapping.eq_object_field(path))
139 }
140}
141
142impl Schema for DatastreamObject {
143 type Mapping = DatastreamObjectMapping;
144
145 fn name(&self) -> &str {
146 self.name.as_ref()
147 }
148
149 fn interface_name(&self) -> &InterfaceName {
150 &self.name
151 }
152
153 fn version_major(&self) -> i32 {
154 self.version.version_major()
155 }
156
157 fn version_minor(&self) -> i32 {
158 self.version.version_minor()
159 }
160
161 fn version(&self) -> InterfaceVersion {
162 self.version
163 }
164
165 fn interface_type(&self) -> InterfaceType {
166 InterfaceType::Datastream
167 }
168
169 fn ownership(&self) -> Ownership {
170 self.ownership
171 }
172
173 fn aggregation(&self) -> Aggregation {
174 Aggregation::Object
175 }
176
177 #[cfg(feature = "doc-fields")]
178 #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
179 fn description(&self) -> Option<&str> {
180 self.description.as_deref()
181 }
182
183 #[cfg(feature = "doc-fields")]
184 #[cfg_attr(docsrs, doc(cfg(feature = "doc-fields")))]
185 fn doc(&self) -> Option<&str> {
186 self.doc.as_deref()
187 }
188
189 fn iter_mappings(&self) -> impl Iterator<Item = &Self::Mapping> {
190 self.mappings.iter()
191 }
192
193 fn mappings_len(&self) -> usize {
194 self.mappings.len()
195 }
196
197 fn iter_interface_mappings(&self) -> impl Iterator<Item = Mapping<Cow<'_, str>>> {
198 self.iter_mappings().map(|mapping| {
199 cfg_if! {
200 if #[cfg(feature = "doc-fields")] {
201 let description = mapping.description().map(Cow::Borrowed);
202 let doc = mapping.doc().map(Cow::Borrowed);
203 } else {
204 let description = None;
205 let doc = None;
206 }
207 }
208
209 cfg_if! {
210 if #[cfg(feature = "server-fields")] {
211 let database_retention_policy = Some(self.database_retention.into());
212 let database_retention_ttl = self.database_retention.as_ttl_secs();
213 } else {
214 let database_retention_policy = None;
215 let database_retention_ttl = None;
216 }
217 }
218
219 Mapping {
220 endpoint: mapping.endpoint().to_string().into(),
221 mapping_type: mapping.mapping_type(),
222 reliability: Some(self.reliability),
223 explicit_timestamp: Some(self.explicit_timestamp),
224 retention: Some(self.retention.into()),
225 expiry: self.retention.as_expiry_seconds(),
226 allow_unset: None,
227 required: None,
228 database_retention_policy,
229 database_retention_ttl,
230 description,
231 doc,
232 }
233 })
234 }
235}
236
237impl Display for DatastreamObject {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 write!(f, "{}:{}", self.name, self.version)
240 }
241}
242
243impl<T> TryFrom<InterfaceJson<T>> for DatastreamObject
244where
245 T: AsRef<str> + Into<String>,
246{
247 type Error = Error;
248
249 fn try_from(value: InterfaceJson<T>) -> Result<Self, Self::Error> {
250 if value.interface_type != InterfaceType::Datastream
251 || value.aggregation != Some(Aggregation::Object)
252 {
253 return Err(Error::InterfaceConversion {
254 exp_type: InterfaceType::Datastream,
255 exp_aggregation: Aggregation::Object,
256 got_type: value.interface_type,
257 got_aggregation: value.aggregation.unwrap_or_default(),
258 });
259 }
260
261 let name = InterfaceName::from_str_ref(value.interface_name)?;
262 let version = InterfaceVersion::try_new(value.version_major, value.version_minor)?;
263
264 let first = value.mappings.first().ok_or(MappingError::Empty)?;
266
267 value
268 .mappings
269 .iter()
270 .skip(1)
271 .try_for_each(|mapping| are_mapping_compatible(first, mapping))?;
272
273 let reliability = first.reliability.unwrap_or_default();
275 let explicit_timestamp = first.explicit_timestamp.unwrap_or_default();
276 let retention = first.retention_with_expiry()?;
277 #[cfg(feature = "server-fields")]
278 let database_retention = first.database_retention_with_ttl()?;
279
280 let mappings = value
281 .mappings
282 .into_iter()
283 .map(|mapping| DatastreamObjectMapping::try_from(mapping).map_err(Error::Mapping))
284 .collect::<Result<Vec<_>, Error>>()?;
285
286 let first = mappings.first().ok_or(MappingError::Empty)?;
287
288 mappings.iter().skip(1).try_for_each(|other| {
289 if !first.is_same_object(other) {
290 return Err(Error::Object(ObjectError::Endpoint {
291 endpoint: other.endpoint().to_string(),
292 }));
293 }
294 Ok(())
295 })?;
296
297 let mappings = MappingVec::try_from(mappings)?;
298
299 Ok(Self {
300 name: name.into_string(),
301 version,
302 ownership: value.ownership,
303 reliability,
304 explicit_timestamp,
305 retention,
306 #[cfg(feature = "server-fields")]
307 database_retention,
308 mappings,
309 #[cfg(feature = "doc-fields")]
310 description: value.description.map(T::into),
311 #[cfg(feature = "doc-fields")]
312 doc: value.doc.map(T::into),
313 })
314 }
315}
316
317fn are_mapping_compatible<T>(a: &Mapping<T>, b: &Mapping<T>) -> Result<(), ObjectError>
318where
319 T: AsRef<str>,
320{
321 if a.reliability != b.reliability {
322 return Err(ObjectError::mapping("reliability", &b.endpoint));
323 }
324
325 if a.explicit_timestamp != b.explicit_timestamp {
326 return Err(ObjectError::mapping("explicit_timestamp", &b.endpoint));
327 }
328
329 if a.retention != b.retention {
330 return Err(ObjectError::mapping("retention", &b.endpoint));
331 }
332
333 if a.expiry != b.expiry {
334 return Err(ObjectError::mapping("expiry", &b.endpoint));
335 }
336
337 #[cfg(feature = "server-fields")]
338 {
339 if a.database_retention_policy != b.database_retention_policy {
340 return Err(ObjectError::mapping(
341 "database_retention_policy",
342 &b.endpoint,
343 ));
344 }
345
346 if a.database_retention_ttl != b.database_retention_ttl {
347 return Err(ObjectError::mapping("database_retention_ttl", &b.endpoint));
348 }
349 }
350
351 Ok(())
352}
353
354impl Serialize for DatastreamObject {
355 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
356 where
357 S: serde::Serializer,
358 {
359 InterfaceJson::from(self).serialize(serializer)
360 }
361}
362
363impl FromStr for DatastreamObject {
364 type Err = Error;
365
366 fn from_str(s: &str) -> Result<Self, Self::Err> {
367 let interface: InterfaceJson<Cow<str>> = serde_json::from_str(s)?;
368
369 Self::try_from(interface)
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use std::time::Duration;
376
377 use pretty_assertions::assert_eq;
378
379 use crate::interface::tests::E2E_DEVICE_AGGREGATE;
380 use crate::schema::MappingType;
381 use crate::Endpoint;
382
383 use super::*;
384
385 #[test]
386 fn should_parse_str() {
387 let object = DatastreamObject::from_str(
388 r#"{
389 "interface_name": "com.example.Example",
390 "version_major": 0,
391 "version_minor": 1,
392 "type": "datastream",
393 "aggregation": "object",
394 "ownership": "server",
395 "description": "The description of the\tinterface",
396 "doc": "The documentation of the\tinterface",
397 "mappings": [{
398 "endpoint": "/prefix/path",
399 "type": "boolean",
400 "reliability": "unique",
401 "explicit_timestamp": true,
402 "retention": "stored",
403 "expiry": 30,
404 "database_retention_policy": "use_ttl",
405 "database_retention_ttl": 420,
406 "required": true,
407 "description": "The description of the\tmapping",
408 "doc": "The documentation of the\tmapping"
409 }]
410 }"#,
411 )
412 .unwrap();
413
414 let exp_mapping = DatastreamObjectMapping {
415 endpoint: Endpoint::try_from("/prefix/path").unwrap(),
416 mapping_type: MappingType::Boolean,
417 required: true,
418 #[cfg(feature = "doc-fields")]
419 description: Some("The description of the\tmapping".to_string()),
420 #[cfg(feature = "doc-fields")]
421 doc: Some("The documentation of the\tmapping".to_string()),
422 };
423
424 let exp = DatastreamObject {
425 name: InterfaceName::try_from("com.example.Example".to_string()).unwrap(),
426 version: InterfaceVersion::try_new(0, 1).unwrap(),
427 ownership: Ownership::Server,
428 reliability: Reliability::Unique,
429 explicit_timestamp: true,
430 retention: Retention::Stored {
431 expiry: Some(Duration::from_secs(30)),
432 },
433 mappings: MappingVec::try_from(vec![exp_mapping.clone()]).unwrap(),
434 #[cfg(feature = "server-fields")]
435 database_retention: crate::interface::DatabaseRetention::UseTtl {
436 ttl: Duration::from_secs(420),
437 },
438 #[cfg(feature = "doc-fields")]
439 description: Some("The description of the\tinterface".to_string()),
440 #[cfg(feature = "doc-fields")]
441 doc: Some("The documentation of the\tinterface".to_string()),
442 };
443
444 assert_eq!(object, exp);
445
446 assert_eq!(object.name(), object.name.as_str());
448 assert_eq!(*object.interface_name(), object.name);
449 assert_eq!(object.version(), object.version);
450 assert_eq!(object.version_major(), object.version.version_major());
451 assert_eq!(object.version_minor(), object.version.version_minor());
452 assert_eq!(object.ownership(), object.ownership);
453 assert_eq!(object.retention(), object.retention);
454 assert_eq!(object.reliability(), object.reliability);
455 assert_eq!(object.explicit_timestamp(), object.explicit_timestamp);
456 assert_eq!(object.interface_type(), InterfaceType::Datastream);
457 assert_eq!(object.aggregation(), Aggregation::Object);
458 #[cfg(feature = "server-fields")]
459 assert_eq!(object.database_retention(), object.database_retention);
460 #[cfg(feature = "doc-fields")]
461 {
462 assert_eq!(object.doc(), object.doc.as_deref());
463 assert_eq!(object.description(), object.description.as_deref());
464 }
465
466 let path = MappingPath::try_from("/prefix").unwrap();
467 assert!(object.is_object_path(&path));
468
469 assert_eq!(*object.mapping("path").unwrap(), exp_mapping);
470
471 let mapping = object.iter_mappings().next().unwrap();
472 assert_eq!(*mapping, exp_mapping);
473
474 let exp_interface_mapping = Mapping::<Cow<'_, str>> {
475 endpoint: mapping.endpoint.to_string().into(),
476 mapping_type: mapping.mapping_type,
477 reliability: object.reliability.into(),
478 explicit_timestamp: Some(object.explicit_timestamp),
479 retention: Some(object.retention.into()),
480 expiry: object.retention.as_expiry_seconds(),
481 allow_unset: None,
482 required: None,
483 #[cfg(feature = "doc-fields")]
484 description: exp_mapping.description.as_ref().map(Cow::from),
485 #[cfg(feature = "doc-fields")]
486 doc: exp_mapping.doc.as_ref().map(Cow::from),
487 #[cfg(not(feature = "doc-fields"))]
488 description: None,
489 #[cfg(not(feature = "doc-fields"))]
490 doc: None,
491 #[cfg(feature = "server-fields")]
492 database_retention_policy: Some(object.database_retention.into()),
493 #[cfg(feature = "server-fields")]
494 database_retention_ttl: object.database_retention.as_ttl_secs(),
495 #[cfg(not(feature = "server-fields"))]
496 database_retention_policy: None,
497 #[cfg(not(feature = "server-fields"))]
498 database_retention_ttl: None,
499 };
500 assert_eq!(
501 object.iter_interface_mappings().next().unwrap(),
502 exp_interface_mapping
503 );
504
505 assert_eq!(object.to_string(), format!("{}:{}", exp.name, exp.version));
506 }
507
508 #[test]
509 fn should_maintain_mapping_order_serde() {
510 let original = DatastreamObject::from_str(E2E_DEVICE_AGGREGATE).unwrap();
511
512 let serialized = serde_json::to_string(&original).unwrap();
513 let deserialized = DatastreamObject::from_str(&serialized).unwrap();
514
515 assert_eq!(deserialized, original);
516 }
517
518 #[test]
519 fn should_check_same_object_endpoint() {
520 let err = DatastreamObject::from_str(
521 r#"{
522 "interface_name": "com.example.Example",
523 "version_major": 0,
524 "version_minor": 1,
525 "type": "datastream",
526 "ownership": "server",
527 "aggregation": "object",
528 "mappings": [{
529 "endpoint": "/prefix/path",
530 "type": "boolean"
531 },{
532 "endpoint": "/wrong/path",
533 "type": "boolean"
534 }]
535 }"#,
536 )
537 .unwrap_err();
538
539 assert!(
540 matches!(
541 &err,
542 Error::Object(ObjectError::Endpoint { endpoint } ) if endpoint == "/wrong/path"),
543 "{err:?}"
544 );
545 }
546}