Skip to main content

mangle_vm/
composite_host.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::Host;
16use std::collections::HashMap;
17
18/// A Host implementation that aggregates multiple sub-hosts.
19///
20/// It routes relation scans and inserts to the appropriate backend based on
21/// explicit routing rules.
22pub struct CompositeHost {
23    hosts: Vec<Box<dyn Host + Send>>,
24
25    /// rel_id -> index in `hosts`
26    routes: HashMap<i32, usize>,
27
28    /// iter_id -> (host_index, real_iter_id)
29    active_iters: HashMap<i32, (usize, i32)>,
30
31    next_iter_id: i32,
32}
33
34impl Default for CompositeHost {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl CompositeHost {
41    pub fn new() -> Self {
42        Self {
43            hosts: Vec::new(),
44            routes: HashMap::new(),
45            active_iters: HashMap::new(),
46            next_iter_id: 1,
47        }
48    }
49
50    /// Adds a sub-host and returns its index.
51    pub fn add_host(&mut self, host: Box<dyn Host + Send>) -> usize {
52        let idx = self.hosts.len();
53        self.hosts.push(host);
54        idx
55    }
56
57    /// Routes a relation to a specific sub-host.
58    pub fn route_relation(&mut self, rel_name: &str, host_index: usize) {
59        let id = hash_name(rel_name);
60        self.routes.insert(id, host_index);
61    }
62}
63
64fn hash_name(name: &str) -> i32 {
65    let mut hash: u32 = 5381;
66    for c in name.bytes() {
67        hash = ((hash << 5).wrapping_add(hash)).wrapping_add(c as u32);
68    }
69    hash as i32
70}
71
72impl Host for CompositeHost {
73    fn scan_start(&mut self, rel_id: i32) -> i32 {
74        if let Some(&h_idx) = self.routes.get(&rel_id) {
75            let real_id = self.hosts[h_idx].scan_start(rel_id);
76            if real_id != 0 {
77                let id = self.next_iter_id;
78                self.next_iter_id += 1;
79                self.active_iters.insert(id, (h_idx, real_id));
80                return id;
81            }
82        }
83        0
84    }
85
86    fn scan_next(&mut self, iter_id: i32) -> i32 {
87        if let Some(&(h_idx, real_id)) = self.active_iters.get(&iter_id) {
88            let ptr = self.hosts[h_idx].scan_next(real_id);
89            if ptr == 0 {
90                return 0;
91            }
92
93            // Tag pointer with host index (using top 6 bits)
94            // This assumes sub-hosts don't use more than 26 bits for their pointers.
95            return ptr | ((h_idx as i32 + 1) << 26);
96        }
97        0
98    }
99
100    fn get_col(&mut self, tuple_ptr: i32, col_idx: i32) -> i64 {
101        let h_idx_plus_1 = (tuple_ptr >> 26) & 0x3F;
102        if h_idx_plus_1 == 0 {
103            return 0;
104        }
105
106        let h_idx = (h_idx_plus_1 - 1) as usize;
107        let real_ptr = tuple_ptr & !(0x3F << 26);
108
109        if h_idx < self.hosts.len() {
110            return self.hosts[h_idx].get_col(real_ptr, col_idx);
111        }
112        0
113    }
114
115    fn insert(&mut self, rel_id: i32, val: i64) {
116        if let Some(&h_idx) = self.routes.get(&rel_id) {
117            self.hosts[h_idx].insert(rel_id, val);
118        }
119    }
120
121    fn scan_delta_start(&mut self, rel_id: i32) -> i32 {
122        if let Some(&h_idx) = self.routes.get(&rel_id) {
123            let real_id = self.hosts[h_idx].scan_delta_start(rel_id);
124            if real_id != 0 {
125                let id = self.next_iter_id;
126                self.next_iter_id += 1;
127                self.active_iters.insert(id, (h_idx, real_id));
128                return id;
129            }
130        }
131        0
132    }
133
134    fn scan_index_start(&mut self, rel_id: i32, col_idx: i32, val: i64) -> i32 {
135        if let Some(&h_idx) = self.routes.get(&rel_id) {
136            let real_id = self.hosts[h_idx].scan_index_start(rel_id, col_idx, val);
137            if real_id != 0 {
138                let id = self.next_iter_id;
139                self.next_iter_id += 1;
140                self.active_iters.insert(id, (h_idx, real_id));
141                return id;
142            }
143        }
144        0
145    }
146
147    fn scan_aggregate_start(&mut self, rel_id: i32, description: Vec<i32>) -> i32 {
148        if let Some(&h_idx) = self.routes.get(&rel_id) {
149            let real_id = self.hosts[h_idx].scan_aggregate_start(rel_id, description);
150            if real_id != 0 {
151                let id = self.next_iter_id;
152                self.next_iter_id += 1;
153                self.active_iters.insert(id, (h_idx, real_id));
154                return id;
155            }
156        }
157        0
158    }
159
160    fn merge_deltas(&mut self) -> i32 {
161        let mut changes = 0;
162        for host in &mut self.hosts {
163            changes |= host.merge_deltas();
164        }
165        changes
166    }
167
168    fn debuglog(&mut self, _val: i64) {}
169}