1use std::cmp::Ord;
4use std::fmt::Debug;
5use std::fmt::Display;
6
7use async_trait::async_trait;
8use futures::{FutureExt, StreamExt};
9use futures::future::BoxFuture;
10use futures::stream::FuturesUnordered;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
16pub struct SystemID(pub u32);
17
18#[async_trait]
20pub trait System {
21 async fn update(&mut self);
22}
23
24#[async_trait]
25impl<F> System for F
26 where F: (Fn() -> BoxFuture<'static, ()>) + Send + Sync
27{
28 async fn update(&mut self) {
29 (self)().await
30 }
31}
32
33pub struct BoxSystem {
35 system: Box<dyn System + Send>,
36 before: Vec<SystemID>,
37 after: Vec<SystemID>,
38}
39
40impl BoxSystem {
41 pub fn new<S: System + Send + 'static>(s: S) -> BoxSystem {
43 BoxSystem {
44 system: Box::new(s),
45
46 before: Vec::new(),
47 after: Vec::new(),
48 }
49 }
50
51 pub fn before(mut self, system: SystemID) -> Self {
54 if let Err(insert_idx) = self.before.binary_search(&system) {
55 self.before.insert(insert_idx, system);
56 }
57
58 self
59 }
60
61 pub fn after(mut self, system: SystemID) -> Self {
64 if let Err(insert_idx) = self.after.binary_search(&system) {
65 self.after.insert(insert_idx, system);
66 }
67
68 self
69 }
70
71 pub fn update(&mut self) -> BoxFuture<()> {
73 self.system.update()
74 }
75}
76
77#[derive(Clone, Debug)]
79pub struct SystemRegistrationError;
80
81impl Display for SystemRegistrationError {
82 fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
83 write!(fmt, "conflicting system requirements")
84 }
85}
86
87impl std::error::Error for SystemRegistrationError {}
88
89pub struct SystemGroup {
91 next_system_id: u32,
92
93 dependencies: Vec<SystemID>,
94 system_dependencies: Vec<(usize, usize)>,
95 systems: Vec<BoxSystem>,
96}
97
98impl SystemGroup {
99 pub fn new() -> SystemGroup {
101 SystemGroup {
102 next_system_id: 0,
103
104 dependencies: Vec::new(),
105 system_dependencies: Vec::new(),
106 systems: Vec::new(),
107 }
108 }
109
110 fn calculate_dependencies(&mut self) {
111 self.dependencies.clear();
112 self.system_dependencies.clear();
113
114 fn add_dep(deps: &mut Vec<SystemID>, offset: usize, id: SystemID) {
115 if let Err(idx) = deps[offset..].binary_search(&id) {
116 deps.insert(idx + offset, id);
117 }
118 }
119
120 for (idx_a, system) in self.systems.iter().enumerate() {
121 let id_a = SystemID(idx_a as u32);
122 let start = self.dependencies.len();
123
124 for dep in system.after.iter().copied() {
125 if dep.0 >= self.systems.len() as u32 {
126 break;
127 }
128
129 add_dep(&mut self.dependencies, start, dep);
130 }
131
132 for (idx_b, system) in self.systems.iter().enumerate() {
133 let id_b = SystemID(idx_b as u32);
134 if idx_b == idx_a {
135 continue;
136 }
137
138 if system.before.binary_search(&id_a).is_ok() {
139 add_dep(&mut self.dependencies, start, id_b);
140 }
141 }
142
143 self.system_dependencies.push((start, self.dependencies.len()));
144 }
145 }
146
147 pub fn insert(&mut self, system: BoxSystem) -> SystemID {
149 let token = SystemID(self.next_system_id);
150 self.next_system_id += 1;
151 self.systems.push(system);
152 self.calculate_dependencies();
153 token
154 }
155
156 pub async fn update(&mut self) {
158 let mut pending = self.systems.iter_mut()
159 .enumerate()
160 .map(|(idx, s)| (idx, s, 0))
161 .collect::<Vec<_>>();
162 let mut running = FuturesUnordered::new();
163 let mut last_completed = None;
164
165 loop {
166 let mut idx = 0;
167 while idx < pending.len() {
168 let (system_id, _, n) = &mut pending[idx];
169 let (start, end) = self.system_dependencies[*system_id];
170 let rest = &self.dependencies[start + *n..end];
171
172 if !rest.is_empty() && rest.first().copied() == last_completed {
173 *n += 1;
174 }
175
176 if start + *n >= end {
177 let (id, sys, _) = pending.remove(idx);
178
179 let f = async move {
180 sys.update().await;
181 SystemID(id as u32)
182 }.boxed();
183 running.push(f);
184 continue;
185 }
186
187 idx += 1;
188 }
189
190 if running.is_empty() {
191 break;
193 }
194
195 let finished = running.next().await.unwrap();
196 last_completed = Some(finished);
197 }
198 }
199}