nexustack/application/
node.rs1use crate::{
9 ApplicationPart,
10 application::{
11 ApplicationPartBuilder, Chain,
12 chain::{InHead, InTail, Index},
13 configurable::Configurable,
14 instrumentation::WithInstrumentation,
15 },
16 inject::{ConstructionResult, ServiceProvider},
17};
18use either::Either;
19use futures_util::TryFutureExt;
20use std::{any::TypeId, borrow::Cow};
21use tokio_util::sync::CancellationToken;
22
23pub struct Node<Head, Tail> {
35 pub(crate) head: Head,
36 pub(crate) tail: Tail,
37}
38
39impl<Head, Tail> ApplicationPart for Node<Head, Tail>
40where
41 Head: ApplicationPart + Send + Sync,
42 Tail: ApplicationPart + Send + Sync,
43{
44 type Error = Either<Head::Error, Tail::Error>;
45
46 fn name() -> Cow<'static, str> {
47 match (Head::name(), Tail::name()) {
48 (Cow::Borrowed(head), Cow::Borrowed(tail)) => Cow::Owned(format!("{head}, {tail}")),
49 (Cow::Borrowed(head), Cow::Owned(mut tail)) => {
50 if (tail.capacity() - tail.len()) >= (head.len() + 2) {
51 tail.insert_str(0, ", ");
52 tail.insert_str(0, head);
53 Cow::Owned(tail)
54 } else {
55 Cow::Owned(format!("{head}, {tail}"))
57 }
58 }
59 (Cow::Owned(mut head), Cow::Borrowed(tail)) => {
60 head.push(',');
61 head.push(' ');
62 head.push_str(tail);
63 Cow::Owned(head)
64 }
65 (Cow::Owned(mut head), Cow::Owned(tail)) => {
66 head.push(',');
67 head.push(' ');
68 head.push_str(tail.as_str());
69 Cow::Owned(head)
70 }
71 }
72 }
73
74 async fn before_startup(
75 &mut self,
76 cancellation_token: CancellationToken,
77 ) -> Result<(), Self::Error> {
78 tokio::try_join!(
79 self.head
80 .before_startup(cancellation_token.clone())
81 .map_err(Either::Left),
82 self.tail
83 .before_startup(cancellation_token)
84 .map_err(Either::Right)
85 )
86 .map(|_| ())
87 }
88
89 async fn run(&mut self, cancellation_token: CancellationToken) -> Result<(), Self::Error> {
90 tokio::try_join!(
91 self.head
92 .run(cancellation_token.clone())
93 .map_err(Either::Left),
94 self.tail.run(cancellation_token).map_err(Either::Right)
95 )
96 .map(|_| ())
97 }
98
99 async fn before_shutdown(
100 &mut self,
101 cancellation_token: CancellationToken,
102 ) -> Result<(), Self::Error> {
103 tokio::try_join!(
104 self.head
105 .before_shutdown(cancellation_token.clone())
106 .map_err(Either::Left),
107 self.tail
108 .before_shutdown(cancellation_token)
109 .map_err(Either::Right)
110 )
111 .map(|_| ())
112 }
113}
114
115impl<Head, Tail> ApplicationPartBuilder for Node<Head, Tail>
116where
117 Head: ApplicationPartBuilder,
118 Tail: ApplicationPartBuilder,
119{
120 type ApplicationPart = Node<WithInstrumentation<Head::ApplicationPart>, Tail::ApplicationPart>;
121
122 fn build(self, service_provider: ServiceProvider) -> ConstructionResult<Self::ApplicationPart> {
123 Ok(Node {
124 head: WithInstrumentation(self.head.build(service_provider.clone())?),
125 tail: self.tail.build(service_provider)?,
126 })
127 }
128}
129
130impl<Head, Tail> Configurable<'static> for Node<Head, Tail>
131where
132 Head: ApplicationPartBuilder + 'static,
133 Tail: ApplicationPartBuilder + Configurable<'static>,
134{
135 fn has_item<I: 'static>() -> bool {
136 TypeId::of::<Head>() == TypeId::of::<I>() || <Tail as Configurable<'_>>::has_item::<I>()
138 }
139}
140
141impl<Head, Tail, HeadIndex> Chain<InHead<HeadIndex>> for Node<Head, Tail>
142where
143 HeadIndex: Index,
144 Head: Chain<HeadIndex>,
145{
146 type Element = Head::Element;
147 fn get(&self) -> &Self::Element {
148 self.head.get()
149 }
150
151 fn get_mut(&mut self) -> &mut Self::Element {
152 self.head.get_mut()
153 }
154}
155
156impl<Head, Tail, TailIndex> Chain<InTail<TailIndex>> for Node<Head, Tail>
157where
158 TailIndex: Index,
159 Tail: Chain<TailIndex>,
160{
161 type Element = Tail::Element;
162 fn get(&self) -> &Self::Element {
163 self.tail.get()
164 }
165
166 fn get_mut(&mut self) -> &mut Self::Element {
167 self.tail.get_mut()
168 }
169}