1use core::{
10 cell::Cell,
11 fmt,
12 marker::PhantomData,
13 pin::{Pin, pin},
14 task::Waker,
15};
16use intrusive_collections::{LinkedList, LinkedListLink, UnsafeRef, intrusive_adapter};
17
18use crate::{Publisher, Subscriber};
19
20pub struct Chain<N = Waker> {
27 inner: Cell<LinkedList<LinkAdapter<N>>>,
28}
29
30impl<N> Chain<N> {
31 pub fn new() -> Self {
33 Chain {
34 inner: Cell::new(LinkedList::new(LinkAdapter::NEW)),
35 }
36 }
37
38 pub fn clear(&mut self) {
40 self.inner.get_mut().clear();
41 }
42
43 pub fn is_empty(&self) -> bool {
45 let inner = self.inner.take();
46 let res = inner.is_empty();
47 self.inner.set(inner);
48 res
49 }
50
51 pub fn notify_one(&self) -> NotifyOne<'_, N>
61 where
62 N: Subscriber,
63 {
64 NotifyOne::new(self)
65 }
66
67 pub fn notify_all(&self) -> NotifyAll<'_, N>
77 where
78 N: Subscriber,
79 {
80 NotifyAll::new(self)
81 }
82
83 fn pop_front(&self) -> Option<UnsafeRef<Link<N>>> {
87 let mut inner = self.inner.take();
88 let res = inner.pop_front();
89 self.inner.set(inner);
90 res
91 }
92
93 fn pop_back(&self) -> Option<UnsafeRef<Link<N>>> {
97 let mut inner = self.inner.take();
98 let res = inner.pop_back();
99 self.inner.set(inner);
100 res
101 }
102}
103
104impl Chain<Waker> {
106 pub async fn fifo(&self) {
110 use odem_rs_core::ops::{sleep, waker};
111
112 let link = pin!(Link::new(waker().await));
114
115 let mut inner = self.inner.take();
117 unsafe {
118 inner.push_back(UnsafeRef::from_raw(&*link));
119 }
120 self.inner.set(inner);
121
122 scopeguard::defer! {
124 unsafe {
125 self.unsubscribe(link.as_ref());
126 }
127 }
128
129 sleep().await;
131 }
132
133 pub async fn lifo(&self) {
137 use odem_rs_core::ops::{sleep, waker};
138
139 let link = pin!(Link::new(waker().await));
141
142 let mut inner = self.inner.take();
144 unsafe {
145 inner.push_front(UnsafeRef::from_raw(&*link));
146 }
147 self.inner.set(inner);
148
149 scopeguard::defer! {
151 unsafe {
152 self.unsubscribe(link.as_ref());
153 }
154 }
155
156 sleep().await;
158 }
159}
160
161impl<N: Subscriber> Subscriber for Chain<N> {
162 fn notify(&self) {
164 self.notify_all().go();
165 }
166}
167
168impl<N: Subscriber> Publisher for Chain<N> {
169 type Link = Link<N>;
170
171 unsafe fn subscribe(&self, link: Pin<&Self::Link>) {
172 let mut inner = self.inner.take();
173 unsafe {
174 inner.push_back(UnsafeRef::from_raw(&*link));
175 }
176 self.inner.set(inner);
177 }
178
179 unsafe fn unsubscribe(&self, link: Pin<&Self::Link>) {
180 if link.hook.is_linked() {
181 let mut inner = self.inner.take();
182 unsafe {
183 inner.cursor_mut_from_ptr(&*link).remove();
184 }
185 self.inner.set(inner);
186 }
187 }
188}
189
190impl<N: fmt::Debug> fmt::Debug for Chain<N> {
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 let inner = self.inner.take();
193 let res = inner.fmt(f);
194 self.inner.set(inner);
195 res
196 }
197}
198
199impl<N> Default for Chain<N> {
200 fn default() -> Self {
201 Self::new()
202 }
203}
204
205impl<N> Drop for Chain<N> {
206 fn drop(&mut self) {
207 self.clear();
208 }
209}
210
211#[pin_project::pin_project]
216pub struct Link<N: ?Sized> {
217 _mark: PhantomData<*const ()>,
219 #[pin]
221 hook: LinkedListLink,
222 pub note: N,
224}
225
226impl<N> Link<N> {
227 pub const fn new(waker: N) -> Self {
229 Link {
230 hook: LinkedListLink::new(),
231 note: waker,
232 _mark: PhantomData,
233 }
234 }
235
236 pub fn is_linked(&self) -> bool {
238 self.hook.is_linked()
239 }
240
241 pub fn into_inner(self) -> N {
243 self.note
244 }
245}
246
247impl<N> From<N> for Link<N> {
248 fn from(value: N) -> Self {
249 Self::new(value)
250 }
251}
252
253impl<N: ?Sized + Subscriber> Subscriber for Link<N> {
254 fn notify(&self) {
255 self.note.notify();
256 }
257}
258
259impl<N: fmt::Debug> fmt::Debug for Link<N> {
260 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
261 f.debug_struct("Link")
262 .field("hook", &self.hook)
263 .field("note", &self.note)
264 .finish()
265 }
266}
267
268impl fmt::Debug for Link<dyn Subscriber> {
269 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270 f.debug_struct("Link").field("hook", &self.hook).finish()
271 }
272}
273
274#[doc(hidden)]
275mod adapter {
276 #![allow(missing_docs)]
277 use super::*;
278
279 intrusive_adapter! {
280 pub LinkAdapter<N> = UnsafeRef<Link<N>>: Link<N> {
283 hook: LinkedListLink
284 }
285 }
286}
287
288pub use adapter::*;
289
290pub trait Trigger {
295 type Output;
297
298 fn trigger(self) -> Self::Output;
300}
301
302#[must_use = "value must be consumed with go() in order to run"]
307pub struct NotifyOne<'c, N, P = (), F = (), const FIRST: bool = true> {
308 chain: &'c Chain<N>,
310 filter: P,
313 action: F,
317}
318
319impl<'c, N: Subscriber> NotifyOne<'c, N, (), ()> {
320 const fn new(chain: &'c Chain<N>) -> Self {
323 NotifyOne {
324 chain,
325 filter: (),
326 action: (),
327 }
328 }
329}
330
331impl<'c, N: Subscriber, P, F, const FIRST: bool> NotifyOne<'c, N, P, F, FIRST> {
332 pub fn filter<X: Fn(&N) -> bool>(self, filter: X) -> NotifyOne<'c, N, X, F> {
336 NotifyOne {
337 chain: self.chain,
338 filter,
339 action: self.action,
340 }
341 }
342
343 pub fn then<X: FnOnce(&N) -> R, R>(self, action: X) -> NotifyOne<'c, N, P, X> {
346 NotifyOne {
347 chain: self.chain,
348 filter: self.filter,
349 action,
350 }
351 }
352
353 pub fn first(self) -> NotifyOne<'c, N, P, F, true> {
355 NotifyOne {
356 chain: self.chain,
357 filter: self.filter,
358 action: self.action,
359 }
360 }
361
362 pub fn last(self) -> NotifyOne<'c, N, P, F, false> {
364 NotifyOne {
365 chain: self.chain,
366 filter: self.filter,
367 action: self.action,
368 }
369 }
370
371 pub fn go(self) -> <Self as Trigger>::Output
373 where
374 Self: Trigger,
375 {
376 Trigger::trigger(self)
377 }
378}
379
380impl<N, P, F, R, const FIRST: bool> Trigger for NotifyOne<'_, N, P, F, FIRST>
381where
382 N: Subscriber,
383 P: Fn(&N) -> bool,
384 F: FnOnce(&N) -> R,
385{
386 type Output = Option<R>;
387
388 fn trigger(self) -> Self::Output {
389 let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
391
392 let mut cur = if FIRST {
394 chain.front_mut()
395 } else {
396 chain.back_mut()
397 };
398 let mut out = None;
399
400 while let Some(link) = cur.get() {
402 if (self.filter)(&link.note) {
404 let res = (self.action)(&link.note);
406 link.notify();
407 cur.remove();
408
409 out = Some(res);
411 break;
412 }
413
414 if FIRST {
416 cur.move_next();
417 } else {
418 cur.move_prev();
419 }
420 }
421
422 self.chain.inner.replace(chain);
424
425 out
426 }
427}
428
429impl<N, P, const FIRST: bool> Trigger for NotifyOne<'_, N, P, (), FIRST>
430where
431 N: Subscriber,
432 P: Fn(&N) -> bool,
433{
434 type Output = ();
435
436 fn trigger(self) -> Self::Output {
437 let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
439
440 let mut cur = if FIRST {
442 chain.front_mut()
443 } else {
444 chain.back_mut()
445 };
446
447 while let Some(link) = cur.get() {
449 if (self.filter)(&link.note) {
451 link.notify();
453 cur.remove();
454 break;
455 }
456
457 if FIRST {
459 cur.move_next();
460 } else {
461 cur.move_prev();
462 }
463 }
464
465 self.chain.inner.replace(chain);
467 }
468}
469
470impl<N, F, R, const FIRST: bool> Trigger for NotifyOne<'_, N, (), F, FIRST>
471where
472 N: Subscriber,
473 F: FnOnce(&N) -> R,
474{
475 type Output = Option<R>;
476
477 fn trigger(self) -> Self::Output {
478 if FIRST {
480 self.chain.pop_front()
481 } else {
482 self.chain.pop_back()
483 }
484 .map(move |link| {
485 let res = (self.action)(&link.note);
487 link.notify();
488 res
489 })
490 }
491}
492
493impl<N, const FIRST: bool> Trigger for NotifyOne<'_, N, (), (), FIRST>
494where
495 N: Subscriber,
496{
497 type Output = ();
498
499 fn trigger(self) -> Self::Output {
500 if let Some(link) = if FIRST {
502 self.chain.pop_front()
503 } else {
504 self.chain.pop_back()
505 } {
506 link.notify();
507 }
508 }
509}
510
511#[must_use = "value must be consumed with go() in order to run"]
516pub struct NotifyAll<'c, N, P = (), F = (), const REV: bool = false> {
517 chain: &'c Chain<N>,
519 filter: P,
522 action: F,
528}
529
530impl<'c, N: Subscriber> NotifyAll<'c, N, (), ()> {
531 const fn new(chain: &'c Chain<N>) -> Self {
536 NotifyAll {
537 chain,
538 filter: (),
539 action: (),
540 }
541 }
542}
543
544impl<'c, N: Subscriber, P, F, const REV: bool> NotifyAll<'c, N, P, F, REV> {
545 pub fn filter<X: Fn(&N) -> bool>(self, filter: X) -> NotifyAll<'c, N, X, F> {
548 NotifyAll {
549 chain: self.chain,
550 filter,
551 action: self.action,
552 }
553 }
554
555 pub fn then<X: FnMut(&N)>(self, action: X) -> NotifyAll<'c, N, P, X> {
558 NotifyAll {
559 chain: self.chain,
560 filter: self.filter,
561 action,
562 }
563 }
564
565 pub fn go(self) -> <Self as Trigger>::Output
567 where
568 Self: Trigger,
569 {
570 Trigger::trigger(self)
571 }
572}
573
574impl<'c, N: Subscriber, P, F> NotifyAll<'c, N, P, F, true> {
575 pub fn rev(self) -> NotifyAll<'c, N, P, F, false> {
577 NotifyAll {
578 chain: self.chain,
579 filter: self.filter,
580 action: self.action,
581 }
582 }
583}
584
585impl<'c, N: Subscriber, P, F> NotifyAll<'c, N, P, F, false> {
586 pub fn rev(self) -> NotifyAll<'c, N, P, F, true> {
588 NotifyAll {
589 chain: self.chain,
590 filter: self.filter,
591 action: self.action,
592 }
593 }
594}
595
596impl<N, P, F, const REV: bool> Trigger for NotifyAll<'_, N, P, F, REV>
597where
598 N: Subscriber,
599 P: Fn(&N) -> bool,
600 F: FnMut(&N),
601{
602 type Output = ();
603
604 fn trigger(mut self) -> Self::Output {
605 let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
607
608 let mut cur = if REV {
610 chain.back_mut()
611 } else {
612 chain.front_mut()
613 };
614
615 while let Some(link) = cur.get() {
617 if (self.filter)(&link.note) {
619 (self.action)(&link.note);
621 link.notify();
622 cur.remove();
623 } else if !REV {
624 cur.move_next();
625 }
626
627 if REV {
628 cur.move_prev();
629 }
630 }
631
632 self.chain.inner.replace(chain);
634 }
635}
636
637impl<N, P, const REV: bool> Trigger for NotifyAll<'_, N, P, (), REV>
638where
639 N: Subscriber,
640 P: Fn(&N) -> bool,
641{
642 type Output = ();
643
644 fn trigger(self) -> Self::Output {
645 let mut chain = self.chain.inner.replace(LinkedList::new(LinkAdapter::NEW));
647
648 let mut cur = if REV {
650 chain.back_mut()
651 } else {
652 chain.front_mut()
653 };
654
655 while let Some(link) = cur.get() {
657 if (self.filter)(&link.note) {
659 cur.remove().unwrap().notify();
661 } else if !REV {
662 cur.move_next();
663 }
664
665 if REV {
666 cur.move_prev();
667 }
668 }
669
670 self.chain.inner.replace(chain);
672 }
673}
674
675impl<N, F, const REV: bool> Trigger for NotifyAll<'_, N, (), F, REV>
676where
677 N: Subscriber,
678 F: FnMut(&N),
679{
680 type Output = ();
681
682 fn trigger(mut self) -> Self::Output {
683 while let Some(link) = if REV {
684 self.chain.pop_back()
685 } else {
686 self.chain.pop_front()
687 } {
688 (self.action)(&link.note);
689 link.notify();
690 }
691 }
692}
693
694impl<N: Subscriber, const REV: bool> Trigger for NotifyAll<'_, N, (), (), REV> {
695 type Output = ();
696
697 fn trigger(self) -> Self::Output {
698 while let Some(link) = if REV {
699 self.chain.pop_back()
700 } else {
701 self.chain.pop_front()
702 } {
703 link.notify();
704 }
705 }
706}
707
708impl Subscriber for Waker {
711 fn notify(&self) {
712 self.wake_by_ref();
713 }
714}