async_component_core/
context.rs1use std::{
2 cell::RefCell,
3 pin::Pin,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8 task::{Context, Poll, Wake, Waker},
9};
10
11use atomic_waker::AtomicWaker;
12use futures_core::Stream;
13
14use crate::AsyncComponent;
15
16thread_local! {
17 static CONTEXT: RefCell<Option<StateContext>> = RefCell::new(None);
18}
19
20pub fn with_current_context<R>(func: impl FnOnce(&StateContext) -> R) -> R {
21 CONTEXT.with(|cx| match *cx.borrow() {
22 Some(ref cx) => func(cx),
23 None => panic!("Called without state context"),
24 })
25}
26
27#[derive(Debug)]
28struct EnterContextGuard {}
29
30impl Drop for EnterContextGuard {
31 fn drop(&mut self) {
32 CONTEXT.with(|cell| {
33 *cell.borrow_mut() = None;
34 })
35 }
36}
37
38fn enter_guarded(cx: StateContext) -> EnterContextGuard {
39 CONTEXT.with(|cell| {
40 {
41 let mut cell = cell.borrow_mut();
42
43 if cell.is_some() {
44 panic!("State context is already set");
45 }
46
47 *cell = Some(cx);
48 }
49
50 EnterContextGuard {}
51 })
52}
53
54#[derive(Debug)]
55pub struct ComponentStream<C> {
56 inner: Arc<Inner>,
57 component: C,
58}
59
60impl<C: AsyncComponent> ComponentStream<C> {
61 pub fn new(func: impl FnOnce() -> C) -> Self {
63 let inner = Arc::new(Inner::default());
64
65 let component = {
66 let _guard = enter_guarded(StateContext::new(Waker::from(inner.clone())));
67
68 func()
69 };
70
71 Self { inner, component }
72 }
73
74 pub fn enter<'a>(&'a mut self) -> EnteredComponentStream<'a, C> {
76 EnteredComponentStream {
77 _guard: enter_guarded(StateContext::new(Waker::from(self.inner.clone()))),
78 stream: self,
79 }
80 }
81}
82
83impl<C: AsyncComponent> Unpin for ComponentStream<C> {}
84
85#[derive(Debug)]
86pub struct EnteredComponentStream<'a, C> {
87 _guard: EnterContextGuard,
88 stream: &'a mut ComponentStream<C>,
89}
90
91impl<C> EnteredComponentStream<'_, C> {
92 pub fn component(&self) -> &C {
93 &self.stream.component
94 }
95
96 pub fn component_mut(&mut self) -> &mut C {
97 &mut self.stream.component
98 }
99}
100
101impl<C: AsyncComponent> Stream for EnteredComponentStream<'_, C> {
102 type Item = ();
103
104 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
105 self.stream.inner.waker.register(cx.waker());
106
107 if self.stream.inner.updated.swap(false, Ordering::Relaxed) {
108 self.stream.component.update_component();
109 Poll::Ready(Some(()))
110 } else {
111 Poll::Pending
112 }
113 }
114}
115
116#[derive(Debug, Clone)]
117pub struct StateContext(Waker);
118
119impl StateContext {
120 pub(crate) const fn new(waker: Waker) -> Self {
121 StateContext(waker)
122 }
123
124 pub fn signal(&self) {
126 self.0.wake_by_ref();
127 }
128
129 pub fn task_context<'a>(&'a self) -> Context<'a> {
131 Context::from_waker(&self.0)
132 }
133}
134
135#[derive(Debug)]
136struct Inner {
137 updated: AtomicBool,
138 waker: AtomicWaker,
139}
140
141impl Wake for Inner {
142 fn wake(self: Arc<Self>) {
143 self.wake_by_ref()
144 }
145
146 fn wake_by_ref(self: &Arc<Self>) {
147 self.updated.store(true, Ordering::Relaxed);
148 self.waker.wake()
149 }
150}
151
152impl Default for Inner {
153 fn default() -> Self {
154 Self {
155 updated: AtomicBool::new(true),
156 waker: Default::default(),
157 }
158 }
159}