1use crate::connection::ConnectionManager;
2use crate::frame::Operation;
3use crate::store::{SharedStore, StoreUpdate};
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use std::collections::HashSet;
8use std::future::Future;
9use std::marker::PhantomData;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[derive(Debug, Clone)]
16pub enum Update<T> {
17 Upsert { key: String, data: T },
18 Patch { key: String, data: T },
19 Delete { key: String },
20}
21
22#[derive(Debug, Clone)]
23pub enum RichUpdate<T> {
24 Created {
25 key: String,
26 data: T,
27 },
28 Updated {
29 key: String,
30 before: T,
31 after: T,
32 patch: Option<serde_json::Value>,
33 },
34 Deleted {
35 key: String,
36 last_known: Option<T>,
37 },
38}
39
40impl<T> Update<T> {
41 pub fn key(&self) -> &str {
42 match self {
43 Update::Upsert { key, .. } => key,
44 Update::Patch { key, .. } => key,
45 Update::Delete { key } => key,
46 }
47 }
48
49 pub fn data(&self) -> Option<&T> {
50 match self {
51 Update::Upsert { data, .. } => Some(data),
52 Update::Patch { data, .. } => Some(data),
53 Update::Delete { .. } => None,
54 }
55 }
56
57 pub fn is_delete(&self) -> bool {
58 matches!(self, Update::Delete { .. })
59 }
60
61 pub fn into_data(self) -> Option<T> {
62 match self {
63 Update::Upsert { data, .. } => Some(data),
64 Update::Patch { data, .. } => Some(data),
65 Update::Delete { .. } => None,
66 }
67 }
68
69 pub fn has_data(&self) -> bool {
70 matches!(self, Update::Upsert { .. } | Update::Patch { .. })
71 }
72
73 pub fn into_key(self) -> String {
74 match self {
75 Update::Upsert { key, .. } => key,
76 Update::Patch { key, .. } => key,
77 Update::Delete { key } => key,
78 }
79 }
80
81 pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Update<U> {
82 match self {
83 Update::Upsert { key, data } => Update::Upsert { key, data: f(data) },
84 Update::Patch { key, data } => Update::Patch { key, data: f(data) },
85 Update::Delete { key } => Update::Delete { key },
86 }
87 }
88}
89
90impl<T> RichUpdate<T> {
91 pub fn key(&self) -> &str {
92 match self {
93 RichUpdate::Created { key, .. } => key,
94 RichUpdate::Updated { key, .. } => key,
95 RichUpdate::Deleted { key, .. } => key,
96 }
97 }
98
99 pub fn data(&self) -> Option<&T> {
100 match self {
101 RichUpdate::Created { data, .. } => Some(data),
102 RichUpdate::Updated { after, .. } => Some(after),
103 RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
104 }
105 }
106
107 pub fn before(&self) -> Option<&T> {
108 match self {
109 RichUpdate::Created { .. } => None,
110 RichUpdate::Updated { before, .. } => Some(before),
111 RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
112 }
113 }
114
115 pub fn into_data(self) -> Option<T> {
116 match self {
117 RichUpdate::Created { data, .. } => Some(data),
118 RichUpdate::Updated { after, .. } => Some(after),
119 RichUpdate::Deleted { last_known, .. } => last_known,
120 }
121 }
122
123 pub fn is_created(&self) -> bool {
124 matches!(self, RichUpdate::Created { .. })
125 }
126
127 pub fn is_updated(&self) -> bool {
128 matches!(self, RichUpdate::Updated { .. })
129 }
130
131 pub fn is_deleted(&self) -> bool {
132 matches!(self, RichUpdate::Deleted { .. })
133 }
134
135 pub fn patch(&self) -> Option<&serde_json::Value> {
136 match self {
137 RichUpdate::Updated { patch, .. } => patch.as_ref(),
138 _ => None,
139 }
140 }
141
142 pub fn has_patch_field(&self, field: &str) -> bool {
143 self.patch()
144 .and_then(|p| p.as_object())
145 .map(|obj| obj.contains_key(field))
146 .unwrap_or(false)
147 }
148}
149
150#[derive(Clone)]
151pub enum KeyFilter {
152 None,
153 Single(String),
154 Multiple(HashSet<String>),
155}
156
157impl KeyFilter {
158 fn matches(&self, key: &str) -> bool {
159 match self {
160 KeyFilter::None => true,
161 KeyFilter::Single(k) => k == key,
162 KeyFilter::Multiple(keys) => keys.contains(key),
163 }
164 }
165}
166
167pub struct EntityStream<T> {
168 state: EntityStreamState<T>,
169 view: String,
170 key_filter: KeyFilter,
171 _marker: PhantomData<T>,
172}
173
174enum EntityStreamState<T> {
175 Lazy {
176 connection: ConnectionManager,
177 store: SharedStore,
178 subscription_view: String,
179 subscription_key: Option<String>,
180 take: Option<u32>,
181 skip: Option<u32>,
182 },
183 Active {
184 inner: BroadcastStream<StoreUpdate>,
185 },
186 Subscribing {
187 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
188 inner: BroadcastStream<StoreUpdate>,
189 },
190 Invalid,
191 _Phantom(PhantomData<T>),
192}
193
194impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
195 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
196 Self {
197 state: EntityStreamState::Active {
198 inner: BroadcastStream::new(rx),
199 },
200 view,
201 key_filter: KeyFilter::None,
202 _marker: PhantomData,
203 }
204 }
205
206 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
207 Self {
208 state: EntityStreamState::Active {
209 inner: BroadcastStream::new(rx),
210 },
211 view,
212 key_filter: KeyFilter::Single(key),
213 _marker: PhantomData,
214 }
215 }
216
217 pub fn new_multi_filtered(
218 rx: broadcast::Receiver<StoreUpdate>,
219 view: String,
220 keys: HashSet<String>,
221 ) -> Self {
222 Self {
223 state: EntityStreamState::Active {
224 inner: BroadcastStream::new(rx),
225 },
226 view,
227 key_filter: KeyFilter::Multiple(keys),
228 _marker: PhantomData,
229 }
230 }
231
232 pub fn new_lazy(
233 connection: ConnectionManager,
234 store: SharedStore,
235 entity_name: String,
236 subscription_view: String,
237 key_filter: KeyFilter,
238 subscription_key: Option<String>,
239 ) -> Self {
240 Self::new_lazy_with_opts(
241 connection,
242 store,
243 entity_name,
244 subscription_view,
245 key_filter,
246 subscription_key,
247 None,
248 None,
249 )
250 }
251
252 #[allow(clippy::too_many_arguments)]
253 pub fn new_lazy_with_opts(
254 connection: ConnectionManager,
255 store: SharedStore,
256 entity_name: String,
257 subscription_view: String,
258 key_filter: KeyFilter,
259 subscription_key: Option<String>,
260 take: Option<u32>,
261 skip: Option<u32>,
262 ) -> Self {
263 Self {
264 state: EntityStreamState::Lazy {
265 connection,
266 store,
267 subscription_view,
268 subscription_key,
269 take,
270 skip,
271 },
272 view: entity_name,
273 key_filter,
274 _marker: PhantomData,
275 }
276 }
277
278 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, Update<T>, F>
279 where
280 F: FnMut(&Update<T>) -> bool,
281 {
282 FilteredStream::new(self, predicate)
283 }
284
285 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, Update<T>, U, F>
286 where
287 F: FnMut(Update<T>) -> Option<U>,
288 {
289 FilterMapStream::new(self, f)
290 }
291
292 pub fn map<U, F>(self, f: F) -> MapStream<Self, Update<T>, U, F>
293 where
294 F: FnMut(Update<T>) -> U,
295 {
296 MapStream::new(self, f)
297 }
298}
299
300impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
301 type Item = Update<T>;
302
303 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304 let this = self.get_mut();
305
306 loop {
307 match &mut this.state {
308 EntityStreamState::Lazy { .. } => {
309 let EntityStreamState::Lazy {
310 connection,
311 store,
312 subscription_view,
313 subscription_key,
314 take,
315 skip,
316 } = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
317 else {
318 unreachable!()
319 };
320
321 let inner = BroadcastStream::new(store.subscribe());
324
325 let conn = connection.clone();
326 let view = subscription_view.clone();
327 let key = subscription_key.clone();
328 let fut = Box::pin(async move {
329 conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
330 .await;
331 });
332
333 this.state = EntityStreamState::Subscribing { fut, inner };
334 continue;
335 }
336 EntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
337 Poll::Ready(()) => {
338 let EntityStreamState::Subscribing { inner, .. } =
339 std::mem::replace(&mut this.state, EntityStreamState::Invalid)
340 else {
341 unreachable!()
342 };
343 this.state = EntityStreamState::Active { inner };
344 continue;
345 }
346 Poll::Pending => return Poll::Pending,
347 },
348 EntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
349 Poll::Ready(Some(Ok(update))) => {
350 if update.view != this.view {
351 continue;
352 }
353
354 if !this.key_filter.matches(&update.key) {
355 continue;
356 }
357
358 match update.operation {
359 Operation::Delete => {
360 return Poll::Ready(Some(Update::Delete { key: update.key }));
361 }
362 Operation::Upsert | Operation::Create | Operation::Snapshot => {
363 if let Some(data) = update.data {
364 if let Ok(typed) = serde_json::from_value::<T>(data) {
365 return Poll::Ready(Some(Update::Upsert {
366 key: update.key,
367 data: typed,
368 }));
369 }
370 }
371 }
372 Operation::Patch => {
373 if let Some(data) = update.data {
374 match serde_json::from_value::<T>(data) {
375 Ok(typed) => {
376 return Poll::Ready(Some(Update::Patch {
377 key: update.key,
378 data: typed,
379 }));
380 }
381 Err(e) => {
382 tracing::warn!(
383 key = %update.key,
384 error = %e,
385 "Patch failed to deserialize to full type, skipping"
386 );
387 continue;
388 }
389 }
390 }
391 }
392 Operation::Subscribed => {
393 continue;
394 }
395 }
396 }
397 Poll::Ready(Some(Err(_lagged))) => {
398 tracing::warn!("EntityStream lagged behind, some messages were dropped");
399 continue;
400 }
401 Poll::Ready(None) => {
402 return Poll::Ready(None);
403 }
404 Poll::Pending => {
405 return Poll::Pending;
406 }
407 },
408 EntityStreamState::Invalid => {
409 panic!("EntityStream in invalid state");
410 }
411 EntityStreamState::_Phantom(_) => unreachable!(),
412 }
413 }
414 }
415}
416
417pub struct RichEntityStream<T> {
418 state: RichEntityStreamState<T>,
419 view: String,
420 key_filter: KeyFilter,
421 _marker: PhantomData<T>,
422}
423
424enum RichEntityStreamState<T> {
425 Lazy {
426 connection: ConnectionManager,
427 store: SharedStore,
428 subscription_view: String,
429 subscription_key: Option<String>,
430 take: Option<u32>,
431 skip: Option<u32>,
432 },
433 Active {
434 inner: BroadcastStream<StoreUpdate>,
435 },
436 Subscribing {
437 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
438 inner: BroadcastStream<StoreUpdate>,
439 },
440 Invalid,
441 _Phantom(PhantomData<T>),
442}
443
444impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
445 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
446 Self {
447 state: RichEntityStreamState::Active {
448 inner: BroadcastStream::new(rx),
449 },
450 view,
451 key_filter: KeyFilter::None,
452 _marker: PhantomData,
453 }
454 }
455
456 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
457 Self {
458 state: RichEntityStreamState::Active {
459 inner: BroadcastStream::new(rx),
460 },
461 view,
462 key_filter: KeyFilter::Single(key),
463 _marker: PhantomData,
464 }
465 }
466
467 pub fn new_lazy(
468 connection: ConnectionManager,
469 store: SharedStore,
470 entity_name: String,
471 subscription_view: String,
472 key_filter: KeyFilter,
473 subscription_key: Option<String>,
474 ) -> Self {
475 Self::new_lazy_with_opts(
476 connection,
477 store,
478 entity_name,
479 subscription_view,
480 key_filter,
481 subscription_key,
482 None,
483 None,
484 )
485 }
486
487 #[allow(clippy::too_many_arguments)]
488 pub fn new_lazy_with_opts(
489 connection: ConnectionManager,
490 store: SharedStore,
491 entity_name: String,
492 subscription_view: String,
493 key_filter: KeyFilter,
494 subscription_key: Option<String>,
495 take: Option<u32>,
496 skip: Option<u32>,
497 ) -> Self {
498 Self {
499 state: RichEntityStreamState::Lazy {
500 connection,
501 store,
502 subscription_view,
503 subscription_key,
504 take,
505 skip,
506 },
507 view: entity_name,
508 key_filter,
509 _marker: PhantomData,
510 }
511 }
512}
513
514impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntityStream<T> {
515 type Item = RichUpdate<T>;
516
517 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
518 let this = self.get_mut();
519
520 loop {
521 match &mut this.state {
522 RichEntityStreamState::Lazy { .. } => {
523 let RichEntityStreamState::Lazy {
524 connection,
525 store,
526 subscription_view,
527 subscription_key,
528 take,
529 skip,
530 } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
531 else {
532 unreachable!()
533 };
534
535 let inner = BroadcastStream::new(store.subscribe());
538
539 let conn = connection.clone();
540 let view = subscription_view.clone();
541 let key = subscription_key.clone();
542 let fut = Box::pin(async move {
543 conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
544 .await;
545 });
546
547 this.state = RichEntityStreamState::Subscribing { fut, inner };
548 continue;
549 }
550 RichEntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
551 Poll::Ready(()) => {
552 let RichEntityStreamState::Subscribing { inner, .. } =
553 std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
554 else {
555 unreachable!()
556 };
557 this.state = RichEntityStreamState::Active { inner };
558 continue;
559 }
560 Poll::Pending => return Poll::Pending,
561 },
562 RichEntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
563 Poll::Ready(Some(Ok(update))) => {
564 if update.view != this.view {
565 continue;
566 }
567
568 if !this.key_filter.matches(&update.key) {
569 continue;
570 }
571
572 let previous: Option<T> =
573 update.previous.and_then(|v| serde_json::from_value(v).ok());
574
575 match update.operation {
576 Operation::Delete => {
577 return Poll::Ready(Some(RichUpdate::Deleted {
578 key: update.key,
579 last_known: previous,
580 }));
581 }
582 Operation::Create | Operation::Snapshot => {
583 if let Some(data) = update.data {
584 if let Ok(typed) = serde_json::from_value::<T>(data) {
585 return Poll::Ready(Some(RichUpdate::Created {
586 key: update.key,
587 data: typed,
588 }));
589 }
590 }
591 }
592 Operation::Upsert | Operation::Patch => {
593 if let Some(data) = update.data {
594 match serde_json::from_value::<T>(data.clone()) {
595 Ok(after) => {
596 if let Some(before) = previous {
597 return Poll::Ready(Some(RichUpdate::Updated {
598 key: update.key,
599 before,
600 after,
601 patch: update.patch,
602 }));
603 } else {
604 return Poll::Ready(Some(RichUpdate::Created {
605 key: update.key,
606 data: after,
607 }));
608 }
609 }
610 Err(e) => {
611 tracing::warn!(
612 key = %update.key,
613 error = %e,
614 "Update failed to deserialize, skipping"
615 );
616 continue;
617 }
618 }
619 }
620 }
621 Operation::Subscribed => {
622 continue;
623 }
624 }
625 }
626 Poll::Ready(Some(Err(_lagged))) => {
627 tracing::warn!(
628 "RichEntityStream lagged behind, some messages were dropped"
629 );
630 continue;
631 }
632 Poll::Ready(None) => {
633 return Poll::Ready(None);
634 }
635 Poll::Pending => {
636 return Poll::Pending;
637 }
638 },
639 RichEntityStreamState::Invalid => {
640 panic!("RichEntityStream in invalid state");
641 }
642 RichEntityStreamState::_Phantom(_) => unreachable!(),
643 }
644 }
645 }
646}
647
648impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
649 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, RichUpdate<T>, F>
650 where
651 F: FnMut(&RichUpdate<T>) -> bool,
652 {
653 FilteredStream::new(self, predicate)
654 }
655
656 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, RichUpdate<T>, U, F>
657 where
658 F: FnMut(RichUpdate<T>) -> Option<U>,
659 {
660 FilterMapStream::new(self, f)
661 }
662
663 pub fn map<U, F>(self, f: F) -> MapStream<Self, RichUpdate<T>, U, F>
664 where
665 F: FnMut(RichUpdate<T>) -> U,
666 {
667 MapStream::new(self, f)
668 }
669}
670
671pin_project! {
672 pub struct FilteredStream<S, I, F> {
673 #[pin]
674 inner: S,
675 predicate: F,
676 _item: PhantomData<I>,
677 }
678}
679
680impl<S, I, F> FilteredStream<S, I, F> {
681 pub fn new(inner: S, predicate: F) -> Self {
682 Self {
683 inner,
684 predicate,
685 _item: PhantomData,
686 }
687 }
688}
689
690impl<S, I, F> Stream for FilteredStream<S, I, F>
691where
692 S: Stream<Item = I>,
693 F: FnMut(&I) -> bool,
694{
695 type Item = I;
696
697 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
698 let mut this = self.project();
699 loop {
700 match this.inner.as_mut().poll_next(cx) {
701 Poll::Ready(Some(item)) => {
702 if (this.predicate)(&item) {
703 return Poll::Ready(Some(item));
704 }
705 }
706 Poll::Ready(None) => return Poll::Ready(None),
707 Poll::Pending => return Poll::Pending,
708 }
709 }
710 }
711}
712
713impl<S, I, F> FilteredStream<S, I, F>
714where
715 S: Stream<Item = I>,
716 F: FnMut(&I) -> bool,
717{
718 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, I, F2>
719 where
720 F2: FnMut(&I) -> bool,
721 {
722 FilteredStream::new(self, predicate)
723 }
724
725 pub fn filter_map<U, F2>(self, f: F2) -> FilterMapStream<Self, I, U, F2>
726 where
727 F2: FnMut(I) -> Option<U>,
728 {
729 FilterMapStream::new(self, f)
730 }
731
732 pub fn map<U, F2>(self, f: F2) -> MapStream<Self, I, U, F2>
733 where
734 F2: FnMut(I) -> U,
735 {
736 MapStream::new(self, f)
737 }
738}
739
740pin_project! {
741 pub struct FilterMapStream<S, I, U, F> {
742 #[pin]
743 inner: S,
744 f: F,
745 _item: PhantomData<(I, U)>,
746 }
747}
748
749impl<S, I, U, F> FilterMapStream<S, I, U, F> {
750 pub fn new(inner: S, f: F) -> Self {
751 Self {
752 inner,
753 f,
754 _item: PhantomData,
755 }
756 }
757}
758
759impl<S, I, U, F> Stream for FilterMapStream<S, I, U, F>
760where
761 S: Stream<Item = I>,
762 F: FnMut(I) -> Option<U>,
763{
764 type Item = U;
765
766 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
767 let mut this = self.project();
768 loop {
769 match this.inner.as_mut().poll_next(cx) {
770 Poll::Ready(Some(item)) => {
771 if let Some(mapped) = (this.f)(item) {
772 return Poll::Ready(Some(mapped));
773 }
774 }
775 Poll::Ready(None) => return Poll::Ready(None),
776 Poll::Pending => return Poll::Pending,
777 }
778 }
779 }
780}
781
782impl<S, I, U, F> FilterMapStream<S, I, U, F>
783where
784 S: Stream<Item = I>,
785 F: FnMut(I) -> Option<U>,
786{
787 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
788 where
789 F2: FnMut(&U) -> bool,
790 {
791 FilteredStream::new(self, predicate)
792 }
793
794 pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
795 where
796 F2: FnMut(U) -> Option<V>,
797 {
798 FilterMapStream::new(self, f)
799 }
800
801 pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
802 where
803 F2: FnMut(U) -> V,
804 {
805 MapStream::new(self, f)
806 }
807}
808
809pin_project! {
810 pub struct MapStream<S, I, U, F> {
811 #[pin]
812 inner: S,
813 f: F,
814 _item: PhantomData<(I, U)>,
815 }
816}
817
818impl<S, I, U, F> MapStream<S, I, U, F> {
819 pub fn new(inner: S, f: F) -> Self {
820 Self {
821 inner,
822 f,
823 _item: PhantomData,
824 }
825 }
826}
827
828impl<S, I, U, F> Stream for MapStream<S, I, U, F>
829where
830 S: Stream<Item = I>,
831 F: FnMut(I) -> U,
832{
833 type Item = U;
834
835 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
836 let this = self.project();
837 match this.inner.poll_next(cx) {
838 Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
839 Poll::Ready(None) => Poll::Ready(None),
840 Poll::Pending => Poll::Pending,
841 }
842 }
843}
844
845impl<S, I, U, F> MapStream<S, I, U, F>
846where
847 S: Stream<Item = I>,
848 F: FnMut(I) -> U,
849{
850 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
851 where
852 F2: FnMut(&U) -> bool,
853 {
854 FilteredStream::new(self, predicate)
855 }
856
857 pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
858 where
859 F2: FnMut(U) -> Option<V>,
860 {
861 FilterMapStream::new(self, f)
862 }
863
864 pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
865 where
866 F2: FnMut(U) -> V,
867 {
868 MapStream::new(self, f)
869 }
870}