Skip to main content

mangle_vm/
composite_host.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{Host, HostVal};
16use std::collections::HashMap;
17
18/// A Host implementation that aggregates multiple sub-hosts.
19///
20/// It routes relation scans and inserts to the appropriate backend based on
21/// explicit routing rules. Values from sub-hosts are remapped through a
22/// composite value table so that HostVal handles remain unique across sub-hosts.
23pub struct CompositeHost {
24    hosts: Vec<Box<dyn Host + Send>>,
25
26    /// rel_id -> index in `hosts`
27    routes: HashMap<i32, usize>,
28
29    /// iter_id -> (host_index, real_iter_id)
30    active_iters: HashMap<i32, (usize, i32)>,
31
32    next_iter_id: i32,
33
34    /// Maps composite HostVal -> (host_index, sub_host HostVal).
35    val_map: Vec<(usize, HostVal)>,
36}
37
38impl Default for CompositeHost {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl CompositeHost {
45    pub fn new() -> Self {
46        Self {
47            hosts: Vec::new(),
48            routes: HashMap::new(),
49            active_iters: HashMap::new(),
50            next_iter_id: 1,
51            val_map: Vec::new(),
52        }
53    }
54
55    /// Adds a sub-host and returns its index.
56    pub fn add_host(&mut self, host: Box<dyn Host + Send>) -> usize {
57        let idx = self.hosts.len();
58        self.hosts.push(host);
59        idx
60    }
61
62    /// Routes a relation to a specific sub-host.
63    pub fn route_relation(&mut self, rel_name: &str, host_index: usize) {
64        let id = hash_name(rel_name);
65        self.routes.insert(id, host_index);
66    }
67
68    /// Wraps a sub-host HostVal into a composite HostVal.
69    fn wrap(&mut self, host_idx: usize, sub_hv: HostVal) -> HostVal {
70        let composite_id = self.val_map.len() as u32;
71        self.val_map.push((host_idx, sub_hv));
72        HostVal(composite_id)
73    }
74
75    /// Unwraps a composite HostVal into (host_index, sub_host HostVal).
76    fn unwrap(&self, hv: HostVal) -> (usize, HostVal) {
77        self.val_map[hv.0 as usize]
78    }
79
80    /// The default host index (0) for value operations.
81    fn default_host(&self) -> usize {
82        0
83    }
84}
85
86fn hash_name(name: &str) -> i32 {
87    let mut hash: u32 = 5381;
88    for c in name.bytes() {
89        hash = ((hash << 5).wrapping_add(hash)).wrapping_add(c as u32);
90    }
91    hash as i32
92}
93
94impl Host for CompositeHost {
95    fn scan_start(&mut self, rel_id: i32) -> i32 {
96        if let Some(&h_idx) = self.routes.get(&rel_id) {
97            let real_id = self.hosts[h_idx].scan_start(rel_id);
98            if real_id != 0 {
99                let id = self.next_iter_id;
100                self.next_iter_id += 1;
101                self.active_iters.insert(id, (h_idx, real_id));
102                return id;
103            }
104        }
105        0
106    }
107
108    fn scan_next(&mut self, iter_id: i32) -> i32 {
109        if let Some(&(h_idx, real_id)) = self.active_iters.get(&iter_id) {
110            let ptr = self.hosts[h_idx].scan_next(real_id);
111            if ptr == 0 {
112                return 0;
113            }
114            // Tag pointer with host index (using top 6 bits)
115            return ptr | ((h_idx as i32 + 1) << 26);
116        }
117        0
118    }
119
120    fn get_col(&mut self, tuple_ptr: i32, col_idx: i32) -> HostVal {
121        let h_idx_plus_1 = (tuple_ptr >> 26) & 0x3F;
122        if h_idx_plus_1 == 0 {
123            return HostVal(0);
124        }
125
126        let h_idx = (h_idx_plus_1 - 1) as usize;
127        let real_ptr = tuple_ptr & !(0x3F << 26);
128
129        if h_idx < self.hosts.len() {
130            let sub_hv = self.hosts[h_idx].get_col(real_ptr, col_idx);
131            return self.wrap(h_idx, sub_hv);
132        }
133        HostVal(0)
134    }
135
136    fn insert_begin(&mut self, rel_id: i32) {
137        if let Some(&h_idx) = self.routes.get(&rel_id) {
138            self.hosts[h_idx].insert_begin(rel_id);
139        }
140    }
141
142    fn insert_push(&mut self, val: HostVal) {
143        let (h_idx, sub_hv) = self.unwrap(val);
144        self.hosts[h_idx].insert_push(sub_hv);
145    }
146
147    fn insert_end(&mut self) {
148        // End insert on all hosts that might have a pending insert.
149        for host in &mut self.hosts {
150            host.insert_end();
151        }
152    }
153
154    fn scan_delta_start(&mut self, rel_id: i32) -> i32 {
155        if let Some(&h_idx) = self.routes.get(&rel_id) {
156            let real_id = self.hosts[h_idx].scan_delta_start(rel_id);
157            if real_id != 0 {
158                let id = self.next_iter_id;
159                self.next_iter_id += 1;
160                self.active_iters.insert(id, (h_idx, real_id));
161                return id;
162            }
163        }
164        0
165    }
166
167    fn scan_index_start(&mut self, rel_id: i32, col_idx: i32, val: HostVal) -> i32 {
168        if let Some(&h_idx) = self.routes.get(&rel_id) {
169            let (_, sub_hv) = self.unwrap(val);
170            let real_id = self.hosts[h_idx].scan_index_start(rel_id, col_idx, sub_hv);
171            if real_id != 0 {
172                let id = self.next_iter_id;
173                self.next_iter_id += 1;
174                self.active_iters.insert(id, (h_idx, real_id));
175                return id;
176            }
177        }
178        0
179    }
180
181    fn scan_aggregate_start(&mut self, rel_id: i32, description: Vec<i32>) -> i32 {
182        if let Some(&h_idx) = self.routes.get(&rel_id) {
183            let real_id = self.hosts[h_idx].scan_aggregate_start(rel_id, description);
184            if real_id != 0 {
185                let id = self.next_iter_id;
186                self.next_iter_id += 1;
187                self.active_iters.insert(id, (h_idx, real_id));
188                return id;
189            }
190        }
191        0
192    }
193
194    fn merge_deltas(&mut self) -> i32 {
195        let mut changes = 0;
196        for host in &mut self.hosts {
197            changes |= host.merge_deltas();
198        }
199        changes
200    }
201
202    // --- Constants: delegate to default host ---
203
204    fn const_number(&mut self, n: i64) -> HostVal {
205        let h = self.default_host();
206        let sub_hv = self.hosts[h].const_number(n);
207        self.wrap(h, sub_hv)
208    }
209
210    fn const_float(&mut self, bits: i64) -> HostVal {
211        let h = self.default_host();
212        let sub_hv = self.hosts[h].const_float(bits);
213        self.wrap(h, sub_hv)
214    }
215
216    fn const_string(&mut self, id: i32) -> HostVal {
217        let h = self.default_host();
218        let sub_hv = self.hosts[h].const_string(id);
219        self.wrap(h, sub_hv)
220    }
221
222    fn const_name(&mut self, id: i32) -> HostVal {
223        let h = self.default_host();
224        let sub_hv = self.hosts[h].const_name(id);
225        self.wrap(h, sub_hv)
226    }
227
228    fn const_time(&mut self, nanos: i64) -> HostVal {
229        let h = self.default_host();
230        let sub_hv = self.hosts[h].const_time(nanos);
231        self.wrap(h, sub_hv)
232    }
233
234    fn const_duration(&mut self, nanos: i64) -> HostVal {
235        let h = self.default_host();
236        let sub_hv = self.hosts[h].const_duration(nanos);
237        self.wrap(h, sub_hv)
238    }
239
240    // --- Arithmetic: delegate to host of first operand ---
241
242    fn val_add(&mut self, a: HostVal, b: HostVal) -> HostVal {
243        let (h, a_sub) = self.unwrap(a);
244        let (_, b_sub) = self.unwrap(b);
245        let sub_hv = self.hosts[h].val_add(a_sub, b_sub);
246        self.wrap(h, sub_hv)
247    }
248
249    fn val_sub(&mut self, a: HostVal, b: HostVal) -> HostVal {
250        let (h, a_sub) = self.unwrap(a);
251        let (_, b_sub) = self.unwrap(b);
252        let sub_hv = self.hosts[h].val_sub(a_sub, b_sub);
253        self.wrap(h, sub_hv)
254    }
255
256    fn val_mul(&mut self, a: HostVal, b: HostVal) -> HostVal {
257        let (h, a_sub) = self.unwrap(a);
258        let (_, b_sub) = self.unwrap(b);
259        let sub_hv = self.hosts[h].val_mul(a_sub, b_sub);
260        self.wrap(h, sub_hv)
261    }
262
263    fn val_div(&mut self, a: HostVal, b: HostVal) -> HostVal {
264        let (h, a_sub) = self.unwrap(a);
265        let (_, b_sub) = self.unwrap(b);
266        let sub_hv = self.hosts[h].val_div(a_sub, b_sub);
267        self.wrap(h, sub_hv)
268    }
269
270    fn val_sqrt(&mut self, a: HostVal) -> HostVal {
271        let (h, a_sub) = self.unwrap(a);
272        let sub_hv = self.hosts[h].val_sqrt(a_sub);
273        self.wrap(h, sub_hv)
274    }
275
276    // --- Comparisons ---
277
278    fn val_eq(&mut self, a: HostVal, b: HostVal) -> i32 {
279        let (h, a_sub) = self.unwrap(a);
280        let (_, b_sub) = self.unwrap(b);
281        self.hosts[h].val_eq(a_sub, b_sub)
282    }
283
284    fn val_neq(&mut self, a: HostVal, b: HostVal) -> i32 {
285        let (h, a_sub) = self.unwrap(a);
286        let (_, b_sub) = self.unwrap(b);
287        self.hosts[h].val_neq(a_sub, b_sub)
288    }
289
290    fn val_lt(&mut self, a: HostVal, b: HostVal) -> i32 {
291        let (h, a_sub) = self.unwrap(a);
292        let (_, b_sub) = self.unwrap(b);
293        self.hosts[h].val_lt(a_sub, b_sub)
294    }
295
296    fn val_le(&mut self, a: HostVal, b: HostVal) -> i32 {
297        let (h, a_sub) = self.unwrap(a);
298        let (_, b_sub) = self.unwrap(b);
299        self.hosts[h].val_le(a_sub, b_sub)
300    }
301
302    fn val_gt(&mut self, a: HostVal, b: HostVal) -> i32 {
303        let (h, a_sub) = self.unwrap(a);
304        let (_, b_sub) = self.unwrap(b);
305        self.hosts[h].val_gt(a_sub, b_sub)
306    }
307
308    fn val_ge(&mut self, a: HostVal, b: HostVal) -> i32 {
309        let (h, a_sub) = self.unwrap(a);
310        let (_, b_sub) = self.unwrap(b);
311        self.hosts[h].val_ge(a_sub, b_sub)
312    }
313
314    fn str_concat(&mut self, a: HostVal, b: HostVal) -> HostVal {
315        let (h, a_sub) = self.unwrap(a);
316        let (_, b_sub) = self.unwrap(b);
317        let sub_hv = self.hosts[h].str_concat(a_sub, b_sub);
318        self.wrap(h, sub_hv)
319    }
320
321    fn str_replace(&mut self, s: HostVal, old: HostVal, new: HostVal, count: HostVal) -> HostVal {
322        let (h, s_sub) = self.unwrap(s);
323        let (_, old_sub) = self.unwrap(old);
324        let (_, new_sub) = self.unwrap(new);
325        let (_, count_sub) = self.unwrap(count);
326        let sub_hv = self.hosts[h].str_replace(s_sub, old_sub, new_sub, count_sub);
327        self.wrap(h, sub_hv)
328    }
329
330    fn val_to_string(&mut self, val: HostVal) -> HostVal {
331        let (h, sub_hv) = self.unwrap(val);
332        let result = self.hosts[h].val_to_string(sub_hv);
333        self.wrap(h, result)
334    }
335
336    fn compound_begin(&mut self, kind: i32) {
337        let h = self.default_host();
338        self.hosts[h].compound_begin(kind);
339    }
340
341    fn compound_push(&mut self, val: HostVal) {
342        let (h, sub_hv) = self.unwrap(val);
343        self.hosts[h].compound_push(sub_hv);
344    }
345
346    fn compound_end(&mut self) -> HostVal {
347        let h = self.default_host();
348        let sub_hv = self.hosts[h].compound_end();
349        self.wrap(h, sub_hv)
350    }
351
352    fn compound_get(&mut self, compound: HostVal, key: HostVal) -> HostVal {
353        let (h, c_sub) = self.unwrap(compound);
354        let (_, k_sub) = self.unwrap(key);
355        let sub_hv = self.hosts[h].compound_get(c_sub, k_sub);
356        self.wrap(h, sub_hv)
357    }
358
359    fn compound_len(&mut self, compound: HostVal) -> HostVal {
360        let (h, c_sub) = self.unwrap(compound);
361        let sub_hv = self.hosts[h].compound_len(c_sub);
362        self.wrap(h, sub_hv)
363    }
364
365    fn pair_first(&mut self, compound: HostVal) -> HostVal {
366        let (h, c_sub) = self.unwrap(compound);
367        let sub_hv = self.hosts[h].pair_first(c_sub);
368        self.wrap(h, sub_hv)
369    }
370
371    fn pair_second(&mut self, compound: HostVal) -> HostVal {
372        let (h, c_sub) = self.unwrap(compound);
373        let sub_hv = self.hosts[h].pair_second(c_sub);
374        self.wrap(h, sub_hv)
375    }
376
377    fn debuglog(&mut self, val: HostVal) {
378        let (h, sub_hv) = self.unwrap(val);
379        self.hosts[h].debuglog(sub_hv);
380    }
381}