1use crate::alloc::collections::BinaryHeap;
2use core::{
3 any::{type_name, TypeId},
4 fmt,
5 mem::MaybeUninit,
6 slice,
7 sync::atomic::{AtomicU32, Ordering},
8};
9
10use crate::{
11 archetype::{TypeIdMap, TypeInfo},
12 Archetype, Component,
13};
14
15#[derive(Debug, Clone, Default)]
17pub struct ColumnBatchType {
18 types: BinaryHeap<TypeInfo>,
19}
20
21impl ColumnBatchType {
22 pub fn new() -> Self {
24 Self::default()
25 }
26
27 pub fn add<T: Component>(&mut self) -> &mut Self {
29 self.types.push(TypeInfo::of::<T>());
30 self
31 }
32
33 pub fn add_dynamic(&mut self, id: TypeInfo) -> &mut Self {
35 self.types.push(id);
36 self
37 }
38
39 pub fn into_batch(self, size: u32) -> ColumnBatchBuilder {
41 assert!(size < u32::MAX);
42 let mut types = self.types.into_sorted_vec();
43 types.dedup();
44 let fill = types
45 .iter()
46 .map(|ty| (ty.id(), AtomicU32::new(0)))
47 .collect();
48 let mut arch = Archetype::new(types);
49 arch.reserve(size);
50 ColumnBatchBuilder {
51 fill,
52 target_fill: size,
53 archetype: Some(arch),
54 }
55 }
56}
57
58pub struct ColumnBatchBuilder {
60 fill: TypeIdMap<AtomicU32>,
62 target_fill: u32,
63 pub(crate) archetype: Option<Archetype>,
64}
65
66unsafe impl Send for ColumnBatchBuilder {}
67unsafe impl Sync for ColumnBatchBuilder {}
68
69impl ColumnBatchBuilder {
70 pub fn new(ty: ColumnBatchType, size: u32) -> Self {
72 ty.into_batch(size)
73 }
74
75 pub fn writer<T: Component>(&self) -> Option<BatchWriter<'_, T>> {
77 let archetype = self.archetype.as_ref().unwrap();
78 let state = archetype.get_state::<T>()?;
79 let base = unsafe { archetype.get_base::<T>(state) };
80 let fill_storage = self.fill.get(&TypeId::of::<T>()).unwrap();
81 let fill = fill_storage.swap(u32::MAX, Ordering::Acquire);
82 if fill == u32::MAX {
83 panic!("another {} writer still exists", type_name::<T>());
84 }
85 Some(BatchWriter {
86 fill_storage,
87 fill,
88 storage: unsafe {
89 &mut slice::from_raw_parts_mut(base.as_ptr().cast(), self.target_fill as usize)
90 [fill as usize..]
91 }
92 .iter_mut(),
93 })
94 }
95
96 pub fn build(mut self) -> Result<ColumnBatch, BatchIncomplete> {
98 let mut archetype = self.archetype.take().unwrap();
99 if archetype
100 .types()
101 .iter()
102 .any(|ty| *self.fill.get_mut(&ty.id()).unwrap().get_mut() != self.target_fill)
103 {
104 return Err(BatchIncomplete { _opaque: () });
105 }
106 unsafe {
107 archetype.set_len(self.target_fill);
108 }
109 Ok(ColumnBatch(archetype))
110 }
111}
112
113impl Drop for ColumnBatchBuilder {
114 fn drop(&mut self) {
115 if let Some(archetype) = self.archetype.take() {
116 for ty in archetype.types() {
117 let fill = *self.fill.get_mut(&ty.id()).unwrap().get_mut();
118 unsafe {
119 let base = archetype.get_dynamic(ty.id(), 0, 0).unwrap();
120 for i in 0..fill {
121 base.as_ptr().add(i as usize).drop_in_place()
122 }
123 }
124 }
125 }
126 }
127}
128
129pub struct ColumnBatch(pub(crate) Archetype);
131
132pub struct BatchWriter<'a, T> {
134 fill_storage: &'a AtomicU32,
135 fill: u32,
136 storage: core::slice::IterMut<'a, MaybeUninit<T>>,
137}
138
139impl<T> BatchWriter<'_, T> {
140 pub fn push(&mut self, x: T) -> Result<(), T> {
142 match self.storage.next() {
143 None => Err(x),
144 Some(slot) => {
145 *slot = MaybeUninit::new(x);
146 self.fill += 1;
147 Ok(())
148 }
149 }
150 }
151
152 pub fn fill(&self) -> u32 {
154 self.fill
155 }
156}
157
158impl<T> Drop for BatchWriter<'_, T> {
159 fn drop(&mut self) {
160 self.storage = core::slice::IterMut::default();
163 self.fill_storage.store(self.fill, Ordering::Release);
164 }
165}
166
167#[derive(Debug, Clone, Eq, PartialEq, Hash)]
169pub struct BatchIncomplete {
170 _opaque: (),
171}
172
173#[cfg(feature = "std")]
174impl std::error::Error for BatchIncomplete {}
175
176impl fmt::Display for BatchIncomplete {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.write_str("batch incomplete")
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn empty_batch() {
188 let mut types = ColumnBatchType::new();
189 types.add::<usize>();
190 let builder = types.into_batch(0);
191 let mut writer = builder.writer::<usize>().unwrap();
192 assert!(writer.push(42).is_err());
193 }
194
195 #[test]
196 fn writer_continues_from_last_fill() {
197 let mut types = ColumnBatchType::new();
198 types.add::<usize>();
199 let builder = types.into_batch(2);
200 {
201 let mut writer = builder.writer::<usize>().unwrap();
202 writer.push(42).unwrap();
203 }
204
205 let mut writer = builder.writer::<usize>().unwrap();
206
207 assert_eq!(writer.push(42), Ok(()));
208 assert_eq!(writer.push(42), Err(42));
209 }
210
211 #[test]
212 fn concurrent_writers() {
213 let mut types = ColumnBatchType::new();
214 types.add::<usize>();
215 types.add::<u32>();
216 let builder = types.into_batch(2);
217 {
218 let mut a = builder.writer::<usize>().unwrap();
219 let mut b = builder.writer::<u32>().unwrap();
220 for i in 0..2 {
221 a.push(i as usize).unwrap();
222 b.push(i).unwrap();
223 }
224 }
225 builder.build().unwrap();
226 }
227
228 #[test]
229 #[should_panic(expected = "writer still exists")]
230 fn aliasing_writers() {
231 let mut types = ColumnBatchType::new();
232 types.add::<usize>();
233 let builder = types.into_batch(2);
234 let _a = builder.writer::<usize>().unwrap();
235 let _b = builder.writer::<usize>().unwrap();
236 }
237}