Skip to main content

cobble_data_structure/
structured_remote_compaction_server.rs

1use cobble::{Config, RemoteCompactionServer, Result};
2use std::net::TcpStream;
3use std::sync::Arc;
4
5use crate::structured_db::{
6    structured_merge_operator_resolver, structured_resolvable_operator_ids,
7};
8
9/// A remote compaction server pre-configured to resolve structured data type
10/// merge operators (e.g. list) from request metadata.
11///
12/// This wraps [`RemoteCompactionServer`] and automatically registers the
13/// structured merge operator resolver on construction.
14pub struct StructuredRemoteCompactionServer {
15    inner: Arc<RemoteCompactionServer>,
16}
17
18impl StructuredRemoteCompactionServer {
19    pub fn new(config: Config) -> Result<Self> {
20        let server = RemoteCompactionServer::new(config)?;
21        server.set_merge_operator_resolver(
22            structured_merge_operator_resolver(),
23            structured_resolvable_operator_ids(),
24        );
25        Ok(Self {
26            inner: Arc::new(server),
27        })
28    }
29
30    pub fn supported_merge_operator_ids(&self) -> Vec<String> {
31        self.inner.supported_merge_operator_ids()
32    }
33
34    pub fn serve(&self, address: &str) -> Result<()> {
35        self.inner.serve(address)
36    }
37
38    pub fn handle_connection(&self, stream: TcpStream) -> Result<()> {
39        self.inner.handle_connection(stream)
40    }
41
42    pub fn inner(&self) -> &RemoteCompactionServer {
43        &self.inner
44    }
45
46    pub fn close(&self) {
47        self.inner.close()
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use crate::StructuredColumnValue;
55    use crate::list::{ListConfig, ListRetainMode};
56    use cobble::VolumeDescriptor;
57    use size::Size;
58    use std::net::TcpListener;
59    use std::thread;
60    use std::time::Duration;
61    use uuid::Uuid;
62
63    #[test]
64    fn test_structured_remote_compaction_server_supported_ids() {
65        let root = format!("/tmp/ds_remote_ids_{}", Uuid::new_v4());
66        let config = Config {
67            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
68            compaction_threads: 1,
69            ..Config::default()
70        };
71        let server = StructuredRemoteCompactionServer::new(config).unwrap();
72        let ids = server.supported_merge_operator_ids();
73        assert!(
74            ids.contains(&"cobble.list.v1".to_string()),
75            "should include cobble.list.v1, got: {:?}",
76            ids
77        );
78        let _ = std::fs::remove_dir_all(&root);
79    }
80
81    #[test]
82    fn test_structured_remote_compaction_with_list_operator() {
83        let root = format!("/tmp/ds_remote_list_{}", Uuid::new_v4());
84
85        // Set up the remote compaction server
86        let server_config = Config {
87            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
88            base_file_size: Size::from_const(128),
89            compaction_threads: 1,
90            num_columns: 2,
91            ..Config::default()
92        };
93        let server = Arc::new(StructuredRemoteCompactionServer::new(server_config).unwrap());
94
95        // Start server in a background thread using serve()
96        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
97        let addr = listener.local_addr().unwrap();
98        drop(listener);
99        let server_clone = Arc::clone(&server);
100        let addr_str = addr.to_string();
101        let _server_thread = thread::spawn(move || {
102            let _ = server_clone.serve(&addr_str);
103        });
104        // Give the server a moment to bind
105        thread::sleep(Duration::from_millis(100));
106
107        // Open a StructuredSingleDb with remote compaction pointing to our server
108        let db_config = Config {
109            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
110            num_columns: 2,
111            total_buckets: 1,
112            base_file_size: Size::from_const(128),
113            compaction_threads: 1,
114            l0_file_limit: 2,
115            compaction_remote_addr: Some(addr.to_string()),
116            ..Config::default()
117        };
118        let mut db = crate::StructuredSingleDb::open(db_config).unwrap();
119        db.update_schema()
120            .add_list_column(
121                1,
122                ListConfig {
123                    max_elements: Some(5),
124                    retain_mode: ListRetainMode::Last,
125                    preserve_element_ttl: false,
126                },
127            )
128            .commit()
129            .unwrap();
130
131        // Write enough data to trigger flush and eventually compaction
132        for i in 0..30u32 {
133            let key = format!("key{:03}", i % 5);
134            db.put(
135                0,
136                key.as_bytes(),
137                0,
138                bytes::Bytes::from(format!("val{}", i)),
139            )
140            .unwrap();
141            db.merge(
142                0,
143                key.as_bytes(),
144                1,
145                vec![bytes::Bytes::from(format!("elem{}", i))],
146            )
147            .unwrap();
148        }
149
150        // Trigger a snapshot to ensure data is flushed
151        let _snap_id = db.snapshot().unwrap();
152        thread::sleep(Duration::from_millis(500));
153
154        // Read back and verify list merge results are correct
155        for i in 0..5u32 {
156            let key = format!("key{:03}", i);
157            let row = db.get(0, key.as_bytes()).unwrap();
158            assert!(row.is_some(), "key {} should exist", key);
159            let row = row.unwrap();
160            // Column 1 is a list — elements should be present
161            if let Some(StructuredColumnValue::List(elements)) = &row[1] {
162                assert!(
163                    !elements.is_empty(),
164                    "key {} list column should have elements",
165                    key
166                );
167            } else {
168                panic!("key {} column 1 should be a list, got {:?}", key, row[1]);
169            }
170        }
171
172        db.close().unwrap();
173        server.close();
174        let _ = std::fs::remove_dir_all(&root);
175    }
176}