1use std::{
2 any::{Any, TypeId},
3 collections::HashMap,
4 pin::Pin,
5 sync::Arc,
6 task::Poll,
7};
8
9use async_context::provide_async_context;
10use futures_util::{
11 future::{self},
12 stream::{once, FuturesOrdered},
13 task::Spawn,
14 Future, Stream, StreamExt,
15};
16
17use crate::{hook::Hook, Element};
18
19use pin_project::pin_project;
20
21type NodeStreamItem<N, E> = Result<(N, NodeStream<N, E>), E>;
22
23#[pin_project]
24pub struct NodeStream<N, E>(#[pin] Pin<Box<dyn Stream<Item = NodeStreamItem<N, E>> + Send>>);
25
26impl<N, E> Stream for NodeStream<N, E> {
27 type Item = NodeStreamItem<N, E>;
28
29 fn poll_next(
30 self: Pin<&mut Self>,
31 cx: &mut std::task::Context<'_>,
32 ) -> Poll<Option<Self::Item>> {
33 let projection = self.project();
34 projection.0.poll_next(cx)
35 }
36}
37
38impl<N, E> NodeStream<N, E>
39where
40 E: Send + 'static,
41 N: Send + 'static,
42{
43 fn from(stream: impl Stream<Item = NodeStreamItem<N, E>> + Send + 'static) -> Self {
44 Self(Box::pin(stream))
45 }
46
47 fn ready(item: NodeStreamItem<N, E>) -> Self {
48 Self::from(once(future::ready(item)))
49 }
50
51 fn wrap(inner: impl Future<Output = NodeStream<N, E>> + Send + 'static) -> Self {
52 Self(Box::pin(once(inner).flatten()))
53 }
54}
55
56#[derive(Clone)]
57struct RenderContext {
58 context: Arc<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>,
59}
60
61impl RenderContext {
62 fn new() -> Self {
63 Self {
64 context: Arc::new(HashMap::new()),
65 }
66 }
67
68 fn with_context(&self, value: Arc<dyn Any + Send + Sync>) -> Self {
69 let mut new_context = self.context.as_ref().clone();
70 new_context.insert(value.type_id(), value);
71 Self {
72 context: Arc::new(new_context),
73 }
74 }
75}
76
77fn render_element<N, E, S>(
78 element: Element<N, E>,
79 spawner: S,
80 ctx: RenderContext,
81) -> Pin<Box<dyn Future<Output = NodeStream<N, E>> + Send>>
82where
83 N: From<String> + Send + 'static,
84 E: Send + 'static,
85 S: Spawn + Clone + Send + 'static,
86{
87 match element {
88 Element::Component(component) => Box::pin(async move {
89 match provide_async_context(Hook::from_context(ctx.context.clone()), component.render())
90 .await
91 {
92 (Ok(element), _) => render_element(element, spawner, ctx).await,
93 (Err(error), _) => NodeStream::ready(Err(error)),
94 }
95 }),
96 Element::Node(node, children) => Box::pin(future::ready(NodeStream::ready(Ok((
97 node,
98 render_children(children, spawner, ctx),
99 ))))),
100 Element::Fragment(children) => {
101 Box::pin(future::ready(render_children(children, spawner, ctx)))
102 }
103 Element::Provider(provider, children) => Box::pin(future::ready(render_children(
104 children,
105 spawner,
106 ctx.with_context(provider),
107 ))),
108 }
109}
110
111fn render_children<N, E, S>(
112 children: Vec<Element<N, E>>,
113 spawner: S,
114 ctx: RenderContext,
115) -> NodeStream<N, E>
116where
117 N: From<String> + Send + 'static,
118 E: Send + 'static,
119 S: Spawn + Clone + Send + 'static,
120{
121 let children = children
122 .into_iter()
123 .map(|child| render_element(child, spawner.clone(), ctx.clone()))
124 .collect::<FuturesOrdered<_>>()
125 .flatten();
126
127 NodeStream::from(children)
128}
129
130pub fn render_stream<N, E, S>(element: Element<N, E>, spawner: S) -> NodeStream<N, E>
135where
136 N: From<String> + Send + Sync + 'static,
137 E: Send + 'static,
138 S: Spawn + Clone + Send + 'static,
139{
140 NodeStream::wrap(render_element(
141 element,
142 spawner.clone(),
143 RenderContext::new(),
144 ))
145}