1use crate::{compress::CompressType, de::FogDeserializer, ser::FogSerializer, MAX_DOC_SIZE};
17use crate::{
18 element::serialize_elem,
19 error::{Error, Result},
20};
21use byteorder::{LittleEndian, ReadBytesExt};
22use fog_crypto::{
23 hash::{Hash, HashState},
24 identity::{Identity, IdentityKey},
25};
26use futures_core::{ready, FusedStream, Stream};
27use pin_project_lite::pin_project;
28use serde::{Deserialize, Serialize};
29use std::convert::TryInto;
30use std::{
31 convert::TryFrom,
32 fmt,
33 pin::Pin,
34 task::{Context, Poll},
35};
36
37pub fn get_doc_schema(doc: &[u8]) -> Result<Option<Hash>> {
40 let hash_raw = SplitDoc::split(doc)?.hash_raw;
41 if hash_raw.is_empty() {
42 Ok(None)
43 } else {
44 Ok(Some(hash_raw.try_into()?))
45 }
46}
47
48pub(crate) struct SplitDoc<'a> {
60 pub compress_raw: u8,
61 pub hash_raw: &'a [u8],
62 pub data: &'a [u8],
63 pub signature_raw: &'a [u8],
64}
65
66impl<'a> SplitDoc<'a> {
67 pub(crate) fn split(buf: &'a [u8]) -> Result<SplitDoc> {
68 let (&compress_raw, buf) = buf.split_first().ok_or(Error::LengthTooShort {
69 step: "get compress type",
70 actual: 0,
71 expected: 1,
72 })?;
73 let (hash_len, buf) = buf.split_first().ok_or(Error::LengthTooShort {
74 step: "get hash length",
75 actual: 0,
76 expected: 1,
77 })?;
78 let hash_len = *hash_len as usize;
79 if hash_len > 127 {
80 return Err(Error::BadHeader(format!(
81 "Hash length must be 0-127, marked as {}",
82 hash_len
83 )));
84 }
85 if buf.len() < hash_len + 3 {
86 return Err(Error::LengthTooShort {
87 step: "get hash then data length",
88 actual: buf.len(),
89 expected: hash_len + 3,
90 });
91 }
92 let (hash_raw, mut buf) = buf.split_at(hash_len);
93 let data_len = buf.read_u24::<LittleEndian>().unwrap() as usize; if data_len > buf.len() {
95 return Err(Error::LengthTooShort {
96 step: "get document data",
97 actual: buf.len(),
98 expected: data_len,
99 });
100 }
101 let (data, signature_raw) = buf.split_at(data_len);
102 Ok(Self {
103 compress_raw,
104 hash_raw,
105 data,
106 signature_raw,
107 })
108 }
109}
110
111#[derive(Clone, Debug)]
112struct DocumentInner {
113 buf: Vec<u8>,
114 hash_state: HashState,
115 schema_hash: Option<Hash>,
116 doc_hash: Hash,
117 this_hash: Hash,
118 signer: Option<Identity>,
119 set_compress: Option<Option<u8>>,
120}
121
122impl DocumentInner {
123 fn signer(&self) -> Option<&Identity> {
124 self.signer.as_ref()
125 }
126
127 fn schema_hash(&self) -> Option<&Hash> {
129 self.schema_hash.as_ref()
130 }
131
132 fn compression(&mut self, setting: Option<u8>) -> &mut Self {
135 self.set_compress = Some(setting);
136 self
137 }
138
139 fn sign(mut self, key: &IdentityKey) -> Result<Self> {
142 let signature = key.sign(&self.doc_hash);
144 let new_len = if self.signer.is_some() {
145 self.buf.len() - self.split().signature_raw.len() + signature.size()
146 } else {
147 self.buf.len() + signature.size()
148 };
149 if new_len > MAX_DOC_SIZE {
150 return Err(Error::LengthTooLong {
151 max: MAX_DOC_SIZE,
152 actual: self.buf.len(),
153 });
154 }
155
156 if self.signer.is_some() {
158 let split = SplitDoc::split(&self.buf).unwrap();
159 let new_len = split.hash_raw.len() + split.data.len() + 5;
160 let mut hash_state = HashState::new();
161 match self.schema_hash {
162 None => hash_state.update([0u8]),
163 Some(ref hash) => hash_state.update(hash),
164 }
165 hash_state.update(split.data);
166 self.buf.resize(new_len, 0);
167 self.hash_state = hash_state;
168 }
169
170 let pre_len = self.buf.len();
172 signature.encode_vec(&mut self.buf);
173 self.hash_state.update(&self.buf[pre_len..]);
174 self.signer = Some(key.id().clone());
175 self.this_hash = self.hash_state.hash();
176 Ok(self)
177 }
178
179 fn hash(&self) -> &Hash {
181 &self.this_hash
182 }
183
184 fn split(&self) -> SplitDoc {
185 SplitDoc::split(&self.buf).unwrap()
186 }
187
188 fn data(&self) -> &[u8] {
189 self.split().data
190 }
191
192 fn complete(self) -> (Hash, Vec<u8>, Option<Option<u8>>) {
193 (self.this_hash, self.buf, self.set_compress)
194 }
195}
196
197#[derive(Clone, Debug)]
198struct VecDocumentInner {
199 done: bool,
200 ser: FogSerializer,
201 item_buf: Vec<u8>,
202 schema: Option<Hash>,
203 signer: Option<IdentityKey>,
204 set_compress: Option<Option<u8>>,
205}
206
207impl VecDocumentInner {
208 fn new(schema: Option<&Hash>) -> Self {
209 Self {
210 done: false,
211 ser: FogSerializer::default(),
212 item_buf: Vec::new(),
213 schema: schema.cloned(),
214 signer: None,
215 set_compress: None,
216 }
217 }
218
219 fn new_ordered(schema: Option<&Hash>) -> Self {
220 Self {
221 done: false,
222 ser: FogSerializer::with_params(true),
223 item_buf: Vec::new(),
224 schema: schema.cloned(),
225 signer: None,
226 set_compress: None,
227 }
228 }
229
230 fn compression(mut self, setting: Option<u8>) -> Self {
231 self.set_compress = Some(setting);
232 self
233 }
234
235 fn sign(mut self, key: &IdentityKey) -> Self {
236 self.signer = Some(key.clone());
237 self
238 }
239
240 fn data_len(&self) -> usize {
241 let header_len = self.schema.as_ref().map_or(5, |h| 5 + h.as_ref().len());
247 let sign_len = self.signer.as_ref().map_or(0, |k| k.max_signature_size());
248 (MAX_DOC_SIZE >> 1) - header_len - sign_len - 4
249 }
250
251 fn next_doc(
252 &mut self,
253 data_len: usize,
254 prev_len: usize,
255 mut array_len: usize,
256 ) -> Result<Option<NewDocument>> {
257 if !self.ser.buf.is_empty() {
258 if self.ser.buf.len() > data_len {
260 self.item_buf.extend_from_slice(&self.ser.buf[prev_len..]);
261 self.ser.buf.truncate(prev_len);
262 array_len -= 1;
263 }
264 let doc = NewDocument::new_from(self.schema.as_ref(), |mut buf| {
266 serialize_elem(&mut buf, crate::element::Element::Array(array_len));
267 buf.extend_from_slice(&self.ser.buf);
268 Ok(buf)
269 })?;
270 let doc = match self.set_compress {
271 Some(set_compress) => doc.compression(set_compress),
272 None => doc,
273 };
274 let doc = match self.signer {
275 Some(ref signer) => doc.sign(signer)?,
276 None => doc,
277 };
278 self.ser.buf.clear();
281 if !self.item_buf.is_empty() {
282 self.ser.buf.extend_from_slice(&self.item_buf);
283 self.item_buf.clear();
284 } else {
285 self.done = true;
286 }
287 Ok(Some(doc))
288 } else {
289 self.done = true;
290 Ok(None)
291 }
292 }
293}
294
295#[derive(Clone, Debug)]
305pub struct VecDocumentBuilder<I>
306where
307 I: Iterator,
308 <I as Iterator>::Item: Serialize,
309{
310 iter: std::iter::Fuse<I>,
311 inner: VecDocumentInner,
312}
313
314impl<I> VecDocumentBuilder<I>
315where
316 I: Iterator,
317 <I as Iterator>::Item: Serialize,
318{
319 pub fn new(iter: I, schema: Option<&Hash>) -> Self {
323 Self {
324 iter: iter.fuse(),
325 inner: VecDocumentInner::new(schema),
326 }
327 }
328
329 pub fn new_ordered(iter: I, schema: Option<&Hash>) -> Self {
337 Self {
338 iter: iter.fuse(),
339 inner: VecDocumentInner::new_ordered(schema),
340 }
341 }
342
343 pub fn compression(mut self, setting: Option<u8>) -> Self {
347 self.inner = self.inner.compression(setting);
348 self
349 }
350
351 pub fn sign(mut self, key: &IdentityKey) -> Self {
353 self.inner = self.inner.sign(key);
354 self
355 }
356
357 fn next_doc(&mut self) -> Result<Option<NewDocument>> {
358 let data_len = self.inner.data_len();
359
360 let mut prev_len = self.inner.ser.buf.len();
361 let mut array_len = !self.inner.ser.buf.is_empty() as usize;
362 while self.inner.ser.buf.len() <= data_len {
363 let item = if let Some(item) = self.iter.next() {
364 item
365 } else {
366 break;
367 };
368 prev_len = self.inner.ser.buf.len();
369 item.serialize(&mut self.inner.ser)?;
370 array_len += 1;
371 }
372
373 self.inner.next_doc(data_len, prev_len, array_len)
374 }
375}
376
377impl<I> Iterator for VecDocumentBuilder<I>
378where
379 I: Iterator,
380 <I as Iterator>::Item: Serialize,
381{
382 type Item = Result<NewDocument>;
383
384 fn next(&mut self) -> Option<Self::Item> {
385 if self.inner.done {
386 return None;
387 }
388 let result = self.next_doc();
389 if result.is_err() {
390 self.inner.done = true;
391 }
392 result.transpose()
393 }
394}
395
396pin_project! {
397 #[must_use = "streams do nothing unless polled"]
407 pub struct AsyncVecDocumentBuilder<St>
408 where
409 St: Stream,
410 St::Item: Serialize,
411 {
412 #[pin]
413 stream: St,
414 inner: VecDocumentInner,
415 array_len: usize,
416 }
417}
418
419impl<St> fmt::Debug for AsyncVecDocumentBuilder<St>
420where
421 St: Stream + fmt::Debug,
422 St::Item: Serialize + fmt::Debug,
423{
424 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
425 f.debug_struct("AsyncVecDocumentBuilder")
426 .field("stream", &self.stream)
427 .field("inner", &self.stream)
428 .field("array_len", &self.array_len)
429 .finish()
430 }
431}
432
433impl<St> AsyncVecDocumentBuilder<St>
434where
435 St: Stream,
436 St::Item: Serialize,
437{
438 pub fn new(stream: St, schema: Option<&Hash>) -> Self {
442 Self {
443 stream,
444 inner: VecDocumentInner::new(schema),
445 array_len: 0,
446 }
447 }
448
449 pub fn new_ordered(stream: St, schema: Option<&Hash>) -> Self {
457 Self {
458 stream,
459 inner: VecDocumentInner::new_ordered(schema),
460 array_len: 0,
461 }
462 }
463
464 pub fn compression(mut self, setting: Option<u8>) -> Self {
468 self.inner = self.inner.compression(setting);
469 self
470 }
471
472 pub fn sign(mut self, key: &IdentityKey) -> Self {
474 self.inner = self.inner.sign(key);
475 self
476 }
477}
478
479impl<St> FusedStream for AsyncVecDocumentBuilder<St>
480where
481 St: Stream + FusedStream,
482 St::Item: Serialize,
483{
484 fn is_terminated(&self) -> bool {
485 self.inner.done && self.stream.is_terminated()
486 }
487}
488
489impl<St> Stream for AsyncVecDocumentBuilder<St>
490where
491 St: Stream,
492 St::Item: Serialize,
493{
494 type Item = Result<NewDocument>;
495
496 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<NewDocument>>> {
497 let mut this = self.project();
498 if this.inner.done {
499 return Poll::Ready(None);
500 }
501 Poll::Ready(loop {
502 if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
504 let prev_len = this.inner.ser.buf.len();
506 if let Err(e) = item.serialize(&mut this.inner.ser) {
507 this.inner.done = true;
508 break Some(Err(e));
509 }
510 *this.array_len += 1;
511
512 let data_len = this.inner.data_len();
514 if this.inner.ser.buf.len() > data_len {
515 let res = this.inner.next_doc(data_len, prev_len, *this.array_len);
516 *this.array_len = !this.inner.ser.buf.is_empty() as usize;
517 if res.is_err() {
518 this.inner.done = true;
519 }
520 break res.transpose();
521 }
522 } else {
523 if !this.inner.ser.buf.is_empty() {
525 let data_len = this.inner.data_len();
526 let res =
527 this.inner
528 .next_doc(data_len, this.inner.ser.buf.len(), *this.array_len);
529 *this.array_len = !this.inner.ser.buf.is_empty() as usize;
530 this.inner.done = true;
531 break res.transpose();
532 } else {
533 break None;
534 }
535 }
536 })
537 }
538}
539
540#[derive(Clone, Debug)]
545pub struct NewDocument(DocumentInner);
546
547impl NewDocument {
548 fn new_from<F>(schema: Option<&Hash>, encoder: F) -> Result<Self>
549 where
550 F: FnOnce(Vec<u8>) -> Result<Vec<u8>>,
551 {
552 let mut buf: Vec<u8> = vec![CompressType::None.into()];
554 if let Some(ref hash) = schema {
555 let hash_len = hash.as_ref().len();
556 assert!(hash_len < 128);
557 buf.push(hash_len as u8);
558 buf.extend_from_slice(hash.as_ref());
559 } else {
560 buf.push(0u8);
561 }
562 buf.extend_from_slice(&[0, 0, 0]);
563 let start = buf.len();
564
565 let mut buf = encoder(buf)?;
567
568 if buf.len() > MAX_DOC_SIZE {
569 return Err(Error::LengthTooLong {
570 max: MAX_DOC_SIZE,
571 actual: buf.len(),
572 });
573 }
574 let data_len = (buf.len() - start).to_le_bytes();
576 buf[start - 3] = data_len[0];
577 buf[start - 2] = data_len[1];
578 buf[start - 1] = data_len[2];
579
580 let mut hash_state = HashState::new();
582 match schema {
583 None => hash_state.update([0u8]),
584 Some(hash) => hash_state.update(hash),
585 }
586 hash_state.update(&buf[start..]);
587 let doc_hash = hash_state.hash();
588 let this_hash = doc_hash.clone();
589
590 Ok(NewDocument(DocumentInner {
591 buf,
592 hash_state,
593 this_hash,
594 schema_hash: schema.cloned(),
595 doc_hash,
596 set_compress: None,
597 signer: None,
598 }))
599 }
600
601 pub fn new<S: Serialize>(schema: Option<&Hash>, data: S) -> Result<Self> {
603 Self::new_from(schema, |buf| {
604 let mut ser = FogSerializer::from_vec(buf, false);
606 data.serialize(&mut ser)?;
607 Ok(ser.finish())
608 })
609 }
610
611 pub fn new_ordered<S: Serialize>(data: S, schema: Option<&Hash>) -> Result<Self> {
616 Self::new_from(schema, |buf| {
617 let mut ser = FogSerializer::from_vec(buf, true);
619 data.serialize(&mut ser)?;
620 Ok(ser.finish())
621 })
622 }
623
624 pub fn schema_hash(&self) -> Option<&Hash> {
626 self.0.schema_hash()
627 }
628
629 pub fn compression(mut self, setting: Option<u8>) -> Self {
632 self.0.compression(setting);
633 self
634 }
635
636 pub fn sign(self, key: &IdentityKey) -> Result<Self> {
640 Ok(Self(self.0.sign(key)?))
641 }
642
643 pub fn hash(&self) -> &Hash {
645 self.0.hash()
646 }
647
648 pub(crate) fn data(&self) -> &[u8] {
649 self.0.data()
650 }
651}
652
653#[derive(Clone, Debug)]
659pub struct Document(DocumentInner);
660
661impl Document {
662 pub(crate) fn from_new(doc: NewDocument) -> Document {
663 Self(doc.0)
664 }
665
666 pub(crate) fn new(buf: Vec<u8>) -> Result<Self> {
669 if buf.len() > MAX_DOC_SIZE {
670 return Err(Error::LengthTooLong {
671 max: MAX_DOC_SIZE,
672 actual: buf.len(),
673 });
674 }
675
676 let split = SplitDoc::split(&buf)?;
677 let schema_hash = if !split.hash_raw.is_empty() {
678 Some(Hash::try_from(split.hash_raw)?)
679 } else {
680 None
681 };
682
683 let mut hash_state = HashState::new();
684 match schema_hash {
685 None => hash_state.update([0u8]),
686 Some(ref hash) => hash_state.update(hash.as_ref()),
687 }
688 hash_state.update(split.data);
689 let doc_hash = hash_state.hash();
690 hash_state.update(split.signature_raw);
691 let this_hash = hash_state.hash();
692
693 let signer = if !split.signature_raw.is_empty() {
694 let unverified =
695 fog_crypto::identity::UnverifiedSignature::try_from(split.signature_raw)?;
696 let verified = unverified.verify(&doc_hash)?;
697 Some(verified.signer().clone())
698 } else {
699 None
700 };
701
702 Ok(Self(DocumentInner {
703 buf,
704 schema_hash,
705 hash_state,
706 this_hash,
707 doc_hash,
708 signer,
709 set_compress: None,
710 }))
711 }
712
713 pub(crate) fn data(&self) -> &[u8] {
714 self.0.data()
715 }
716
717 pub fn find_hashes(&self) -> Vec<Hash> {
719 crate::find_hashes(self.data())
720 }
721
722 pub fn schema_hash(&self) -> Option<&Hash> {
724 self.0.schema_hash()
725 }
726
727 pub fn signer(&self) -> Option<&Identity> {
729 self.0.signer()
730 }
731
732 pub fn hash(&self) -> &Hash {
735 self.0.hash()
736 }
737
738 pub fn deserialize<'de, D: Deserialize<'de>>(&'de self) -> Result<D> {
740 let buf = self.0.data();
741 let mut de = FogDeserializer::new(buf);
742 D::deserialize(&mut de)
743 }
744
745 pub fn compression(mut self, setting: Option<u8>) -> Self {
749 self.0.compression(setting);
750 self
751 }
752
753 pub fn sign(self, key: &IdentityKey) -> Result<Self> {
756 Ok(Self(self.0.sign(key)?))
757 }
758
759 pub(crate) fn complete(self) -> (Hash, Vec<u8>, Option<Option<u8>>) {
760 self.0.complete()
761 }
762}
763
764#[cfg(test)]
765mod test {
766 use rand::Rng;
767 use std::mem;
768 use std::ops;
769
770 use super::*;
771
772 #[test]
773 fn create_new() {
774 let new_doc = NewDocument::new(None, 1u8).unwrap();
775 assert!(new_doc.schema_hash().is_none());
776 let expected_hash = Hash::new([0u8, 1u8]);
777 assert_eq!(new_doc.hash(), &expected_hash);
778 assert_eq!(new_doc.data(), &[1u8]);
779 let expected = vec![0u8, 0u8, 1u8, 0u8, 0u8, 1u8];
780 let (doc_hash, doc_vec, doc_compress) = Document::from_new(new_doc).complete();
781 assert_eq!(doc_hash, expected_hash);
782 assert_eq!(doc_vec, expected);
783 assert_eq!(doc_compress, None);
784 }
785
786 #[test]
787 fn create_doc() {
788 let encoded = vec![0u8, 0u8, 1u8, 0u8, 0u8, 1u8];
789 let doc = Document::new(encoded.clone()).unwrap();
790 let expected_hash = Hash::new([0u8, 1u8]);
791 assert_eq!(doc.hash(), &expected_hash);
792 assert_eq!(doc.data(), &[1u8]);
793 let val: u8 = doc.deserialize().unwrap();
794 assert_eq!(val, 1u8);
795 let (doc_hash, doc_vec, doc_compress) = doc.complete();
796 assert_eq!(doc_hash, expected_hash);
797 assert_eq!(doc_vec, encoded);
798 assert_eq!(doc_compress, None);
799 }
800
801 #[test]
802 fn new_doc_limits() {
803 use serde_bytes::Bytes;
804 let vec = vec![0xAAu8; MAX_DOC_SIZE]; let key = IdentityKey::with_rng(&mut rand::rngs::OsRng);
806 let sign_len = key.sign(&Hash::new(b"meh")).size();
810 let new_doc =
811 NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 9 - sign_len)])).unwrap();
812 let signed_doc = new_doc.clone().sign(&key).unwrap();
813 assert_eq!(
814 &signed_doc.0.buf[..(signed_doc.0.buf.len() - sign_len)],
815 &new_doc.0.buf[..]
816 );
817
818 let new_doc = NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 10)])).unwrap();
820 let mut expected = vec![0x00, 0x00];
821 expected.extend_from_slice(&(MAX_DOC_SIZE - 6).to_le_bytes()[..3]);
822 assert_eq!(new_doc.0.buf[0..5], expected);
823 let new_doc = NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 9)])).unwrap();
824 let mut expected = vec![0x00, 0x00];
825 expected.extend_from_slice(&(MAX_DOC_SIZE - 5).to_le_bytes()[..3]);
826 assert_eq!(new_doc.0.buf[0..5], expected);
827 new_doc.sign(&key).unwrap_err(); NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 8)])).unwrap_err();
831 NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 7)])).unwrap_err();
832 NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 6)])).unwrap_err();
833 NewDocument::new(None, Bytes::new(&vec[..(MAX_DOC_SIZE - 5)])).unwrap_err();
834 }
835
836 #[test]
837 fn new_doc_schema_limits() {
838 use serde_bytes::Bytes;
839 let vec = vec![0xAAu8; MAX_DOC_SIZE]; let key = IdentityKey::with_rng(&mut rand::rngs::OsRng);
841 let schema_hash = Hash::new(b"I'm totally a real schema, trust me");
842 let hash_len = schema_hash.as_ref().len();
843 let sign_len = key.sign(&Hash::new(b"meh")).size();
847 let new_doc = NewDocument::new(
848 Some(&schema_hash),
849 Bytes::new(&vec[..(MAX_DOC_SIZE - 9 - sign_len - hash_len)]),
850 )
851 .unwrap();
852 let signed_doc = new_doc.clone().sign(&key).unwrap();
853 assert_eq!(
854 &signed_doc.0.buf[..(signed_doc.0.buf.len() - sign_len)],
855 &new_doc.0.buf[..]
856 );
857
858 let new_doc = NewDocument::new(
860 Some(&schema_hash),
861 Bytes::new(&vec[..(MAX_DOC_SIZE - 10 - hash_len)]),
862 )
863 .unwrap();
864 let mut expected = vec![0x00, hash_len as u8];
865 expected.extend_from_slice(schema_hash.as_ref());
866 expected.extend_from_slice(&(MAX_DOC_SIZE - 6 - hash_len).to_le_bytes()[..3]);
867 assert_eq!(new_doc.0.buf[0..(5 + hash_len)], expected);
868
869 let new_doc = NewDocument::new(
871 Some(&schema_hash),
872 Bytes::new(&vec[..(MAX_DOC_SIZE - 9 - hash_len)]),
873 )
874 .unwrap();
875 let mut expected = vec![0x00, hash_len as u8];
876 expected.extend_from_slice(schema_hash.as_ref());
877 expected.extend_from_slice(&(MAX_DOC_SIZE - 5 - hash_len).to_le_bytes()[..3]);
878 assert_eq!(new_doc.0.buf[0..(5 + hash_len)], expected);
879 new_doc.sign(&key).unwrap_err(); NewDocument::new(
883 Some(&schema_hash),
884 Bytes::new(&vec[..(MAX_DOC_SIZE - 8 - hash_len)]),
885 )
886 .unwrap_err();
887 NewDocument::new(
888 Some(&schema_hash),
889 Bytes::new(&vec[..(MAX_DOC_SIZE - 7 - hash_len)]),
890 )
891 .unwrap_err();
892 NewDocument::new(
893 Some(&schema_hash),
894 Bytes::new(&vec[..(MAX_DOC_SIZE - 6 - hash_len)]),
895 )
896 .unwrap_err();
897 NewDocument::new(
898 Some(&schema_hash),
899 Bytes::new(&vec[..(MAX_DOC_SIZE - 5 - hash_len)]),
900 )
901 .unwrap_err();
902 }
903
904 #[test]
905 fn sign_roundtrip() {
906 let key = IdentityKey::with_rng(&mut rand::rngs::OsRng);
907 let new_doc = NewDocument::new(None, 1u8).unwrap().sign(&key).unwrap();
908 assert_eq!(new_doc.data(), &[1u8]);
909 let (doc_hash, doc_vec, _) = Document::from_new(new_doc).complete();
910 let doc = Document::new(doc_vec).unwrap();
911 let val: u8 = doc.deserialize().unwrap();
912 assert_eq!(&doc_hash, doc.hash());
913 assert_eq!(val, 1u8);
914 assert_eq!(doc.signer().unwrap(), key.id());
915 }
916
917 #[test]
918 fn vec_document_encode() {
919 #[derive(Clone, Serialize)]
920 struct Example {
921 a: u32,
922 b: String,
923 }
924
925 let mut builder = VecDocumentBuilder::new(
926 std::iter::repeat(Example {
927 a: 234235,
928 b: "Ok".into(),
929 }),
930 None,
931 );
932 let mut docs = Vec::new();
933 for _ in 0..4 {
934 let iter = builder.next();
935 let result = iter.unwrap();
936 let doc = result.unwrap();
937 docs.push(doc);
938 }
939 assert!(docs.iter().all(|doc| {
940 let len = doc.0.buf.len();
941 len <= (MAX_DOC_SIZE >> 1) && len > (MAX_DOC_SIZE >> 2)
942 }));
943 }
944
945 #[test]
946 fn vec_document_encode_all() {
947 #[derive(Clone, Serialize)]
948 struct Example {
949 a: u32,
950 b: String,
951 }
952
953 let iter = std::iter::repeat(Example {
954 a: 23456,
955 b: "Ok".into(),
956 })
957 .take(MAX_DOC_SIZE + 12);
958 let builder = VecDocumentBuilder::new(iter, None);
959 let docs = builder.collect::<Result<Vec<NewDocument>>>().unwrap();
960 assert!(docs.iter().take(docs.len() - 1).all(|doc| {
961 let len = doc.0.buf.len();
962 len <= (MAX_DOC_SIZE >> 1) && len > (MAX_DOC_SIZE >> 2)
963 }));
964 assert!(!docs.last().unwrap().data().is_empty());
965 }
966
967 pub trait Generate {
968 fn generate<R: Rng>(rng: &mut R) -> Self;
969 }
970
971 impl Generate for () {
972 fn generate<R: Rng>(_: &mut R) -> Self {}
973 }
974
975 impl Generate for bool {
976 fn generate<R: Rng>(rng: &mut R) -> Self {
977 rng.gen_bool(0.5)
978 }
979 }
980
981 macro_rules! impl_generate {
982 ($ty:ty) => {
983 impl Generate for $ty {
984 fn generate<R: Rng>(rng: &mut R) -> Self {
985 rng.gen()
986 }
987 }
988 };
989 }
990
991 impl_generate!(u8);
992 impl_generate!(u16);
993 impl_generate!(u32);
994 impl_generate!(u64);
995 impl_generate!(u128);
996 impl_generate!(usize);
997 impl_generate!(i8);
998 impl_generate!(i16);
999 impl_generate!(i32);
1000 impl_generate!(i64);
1001 impl_generate!(i128);
1002 impl_generate!(isize);
1003 impl_generate!(f32);
1004 impl_generate!(f64);
1005
1006 macro_rules! impl_tuple {
1007 () => {};
1008 ($first:ident, $($rest:ident,)*) => {
1009 impl<$first: Generate, $($rest: Generate,)*> Generate for ($first, $($rest,)*) {
1010 fn generate<R: Rng>(rng: &mut R) -> Self {
1011 ($first::generate(rng), $($rest::generate(rng),)*)
1012 }
1013 }
1014
1015 impl_tuple!($($rest,)*);
1016 };
1017 }
1018
1019 impl_tuple!(T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11,);
1020
1021 macro_rules! impl_array {
1022 () => {};
1023 ($len:literal, $($rest:literal,)*) => {
1024 impl<T: Generate> Generate for [T; $len] {
1025 fn generate<R: Rng>(rng: &mut R) -> Self {
1026 let mut result = mem::MaybeUninit::<Self>::uninit();
1027 let result_ptr = result.as_mut_ptr().cast::<T>();
1028 #[allow(clippy::reversed_empty_ranges)]
1029 for i in 0..$len {
1030 unsafe {
1031 result_ptr.add(i).write(T::generate(rng));
1032 }
1033 }
1034 unsafe {
1035 result.assume_init()
1036 }
1037 }
1038 }
1039
1040 impl_array!($($rest,)*);
1041 }
1042 }
1043
1044 impl_array!(
1045 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9,
1046 8, 7, 6, 5, 4, 3, 2, 1, 0,
1047 );
1048
1049 impl<T: Generate> Generate for Option<T> {
1050 fn generate<R: Rng>(rng: &mut R) -> Self {
1051 if rng.gen_bool(0.5) {
1052 Some(T::generate(rng))
1053 } else {
1054 None
1055 }
1056 }
1057 }
1058
1059 pub fn generate_vec<R: Rng, T: Generate>(rng: &mut R, range: ops::Range<usize>) -> Vec<T> {
1060 let len = rng.gen_range(range);
1061 let mut result = Vec::with_capacity(len);
1062 for _ in 0..len {
1063 result.push(T::generate(rng));
1064 }
1065 result
1066 }
1067
1068 #[derive(Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1069 pub struct Address {
1070 pub x0: u8,
1071 pub x1: u8,
1072 pub x2: u8,
1073 pub x3: u8,
1074 }
1075
1076 impl Generate for Address {
1077 fn generate<R: Rng>(rand: &mut R) -> Self {
1078 Self {
1079 x0: rand.gen(),
1080 x1: rand.gen(),
1081 x2: rand.gen(),
1082 x3: rand.gen(),
1083 }
1084 }
1085 }
1086
1087 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1088 pub struct Log {
1089 pub address: Address,
1090 pub identity: String,
1091 pub userid: String,
1092 pub date: String,
1093 pub request: String,
1094 pub code: u16,
1095 pub size: u64,
1096 }
1097
1098 impl Generate for Log {
1099 fn generate<R: Rng>(rand: &mut R) -> Self {
1100 const USERID: [&str; 9] = [
1101 "-", "alice", "bob", "carmen", "david", "eric", "frank", "george", "harry",
1102 ];
1103 const MONTHS: [&str; 12] = [
1104 "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec",
1105 ];
1106 const TIMEZONE: [&str; 25] = [
1107 "-1200", "-1100", "-1000", "-0900", "-0800", "-0700", "-0600", "-0500", "-0400",
1108 "-0300", "-0200", "-0100", "+0000", "+0100", "+0200", "+0300", "+0400", "+0500",
1109 "+0600", "+0700", "+0800", "+0900", "+1000", "+1100", "+1200",
1110 ];
1111 let date = format!(
1112 "{}/{}/{}:{}:{}:{} {}",
1113 rand.gen_range(1..29),
1114 MONTHS[rand.gen_range(0..12)],
1115 rand.gen_range(1970..2022),
1116 rand.gen_range(0..24),
1117 rand.gen_range(0..60),
1118 rand.gen_range(0..60),
1119 TIMEZONE[rand.gen_range(0..25)],
1120 );
1121 const CODES: [u16; 63] = [
1122 100, 101, 102, 103, 200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 300, 301,
1123 302, 303, 304, 305, 306, 307, 308, 400, 401, 402, 403, 404, 405, 406, 407, 408,
1124 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 421, 422, 423, 424, 425, 426,
1125 428, 429, 431, 451, 500, 501, 502, 503, 504, 505, 506, 507, 508, 510, 511,
1126 ];
1127 const METHODS: [&str; 5] = ["GET", "POST", "PUT", "UPDATE", "DELETE"];
1128 const ROUTES: [&str; 7] = [
1129 "/favicon.ico",
1130 "/css/index.css",
1131 "/css/font-awsome.min.css",
1132 "/img/logo-full.svg",
1133 "/img/splash.jpg",
1134 "/api/login",
1135 "/api/logout",
1136 ];
1137 const PROTOCOLS: [&str; 4] = ["HTTP/1.0", "HTTP/1.1", "HTTP/2", "HTTP/3"];
1138 let request = format!(
1139 "{} {} {}",
1140 METHODS[rand.gen_range(0..5)],
1141 ROUTES[rand.gen_range(0..7)],
1142 PROTOCOLS[rand.gen_range(0..4)],
1143 );
1144 Self {
1145 address: Address::generate(rand),
1146 identity: "-".into(),
1147 userid: USERID[rand.gen_range(0..USERID.len())].into(),
1148 date,
1149 request,
1150 code: CODES[rand.gen_range(0..CODES.len())],
1151 size: rand.gen_range(0..100_000_000),
1152 }
1153 }
1154 }
1155
1156 #[test]
1157 fn logs_encode() {
1158 let mut rng = rand::thread_rng();
1160 const LOGS: usize = 10_000;
1161 let logs = generate_vec::<_, Log>(&mut rng, LOGS..LOGS + 1);
1162
1163 let builder = VecDocumentBuilder::new(logs.iter(), None);
1165 let docs = builder.collect::<Result<Vec<NewDocument>>>().unwrap();
1166 for (index, doc) in docs.iter().enumerate() {
1167 let mut parser = crate::element::Parser::with_debug(doc.data(), " ");
1168 for x in &mut parser {
1169 x.unwrap();
1170 }
1171 println!("Doc #{}: \n{}", index, parser.get_debug().unwrap());
1172 }
1173 assert!(docs.iter().take(docs.len() - 1).all(|doc| {
1174 let len = doc.0.buf.len();
1175 len <= (MAX_DOC_SIZE >> 1) && len > (MAX_DOC_SIZE >> 2)
1176 }));
1177 }
1178
1179 #[test]
1180 fn logs_decode() {
1181 let mut rng = rand::thread_rng();
1183 const LOGS: usize = 10_000;
1184 let logs = generate_vec::<_, Log>(&mut rng, LOGS..LOGS + 1);
1185
1186 let builder = VecDocumentBuilder::new(logs.iter(), None);
1188 let mut docs = builder.collect::<Result<Vec<NewDocument>>>().unwrap();
1189
1190 let docs: Vec<Document> = docs
1191 .drain(0..)
1192 .map(|doc| crate::schema::NoSchema::validate_new_doc(doc).unwrap())
1193 .collect();
1194 let dec_logs: Vec<Log> = docs
1195 .iter()
1196 .flat_map(|doc| doc.deserialize::<Vec<Log>>().unwrap())
1197 .collect();
1198 assert!(dec_logs == logs, "Didn't decode identically")
1199 }
1200
1201 #[test]
1202 fn async_logs_encode() {
1203 let mut rng = rand::thread_rng();
1205 const LOGS: usize = 20_000;
1206 let logs = generate_vec::<_, Log>(&mut rng, LOGS..LOGS + 1);
1207
1208 let mut builder =
1210 AsyncVecDocumentBuilder::new(futures_util::stream::iter(logs.iter()), None);
1211 use futures_util::StreamExt;
1212 let docs = futures_executor::block_on(async {
1213 let mut docs = Vec::new();
1214 while let Some(result) = builder.next().await {
1215 match result {
1216 Ok(doc) => docs.push(doc),
1217 Err(e) => return Err(e),
1218 }
1219 }
1220 Ok(docs)
1221 })
1222 .unwrap();
1223 for (index, doc) in docs.iter().enumerate() {
1224 let mut parser = crate::element::Parser::with_debug(doc.data(), " ");
1225 for x in &mut parser {
1226 x.unwrap();
1227 }
1228 println!("Doc #{}: \n{}", index, parser.get_debug().unwrap());
1229 }
1230 println!("A total of {} documents", docs.len());
1231 assert!(docs.iter().take(docs.len() - 1).all(|doc| {
1232 let len = doc.0.buf.len();
1233 len <= (MAX_DOC_SIZE >> 1) && len > (MAX_DOC_SIZE >> 2)
1234 }));
1235 }
1236
1237 #[test]
1238 fn async_logs_decode() {
1239 let mut rng = rand::thread_rng();
1241 const LOGS: usize = 20_000;
1242 let logs = generate_vec::<_, Log>(&mut rng, LOGS..LOGS + 1);
1243
1244 let mut builder =
1246 AsyncVecDocumentBuilder::new(futures_util::stream::iter(logs.iter()), None);
1247 use futures_util::StreamExt;
1248 let mut docs = futures_executor::block_on(async {
1249 let mut docs = Vec::new();
1250 while let Some(result) = builder.next().await {
1251 match result {
1252 Ok(doc) => docs.push(doc),
1253 Err(e) => return Err(e),
1254 }
1255 }
1256 Ok(docs)
1257 })
1258 .unwrap();
1259
1260 let docs: Vec<Document> = docs
1262 .drain(0..)
1263 .map(|doc| crate::schema::NoSchema::validate_new_doc(doc).unwrap())
1264 .collect();
1265 let dec_logs: Vec<Log> = docs
1266 .iter()
1267 .map(|doc| doc.deserialize::<Vec<Log>>().unwrap())
1268 .flat_map(|doc| {
1269 println!("Document item count = {}", doc.len());
1270 doc
1271 })
1272 .collect();
1273 println!("We have a total of {} logs", dec_logs.len());
1274 println!("We expected a total of {} logs", logs.len());
1275 assert!(dec_logs == logs, "Didn't decode identically")
1276 }
1277}