Skip to main content

cobble_data_structure/
structured_remote_compaction_server.rs

1use cobble::{Config, RemoteCompactionServer, Result};
2use std::net::TcpStream;
3use std::sync::Arc;
4
5use crate::structured_db::{
6    structured_merge_operator_resolver, structured_resolvable_operator_ids,
7};
8
9/// A remote compaction server pre-configured to resolve structured data type
10/// merge operators (e.g. list) from request metadata.
11///
12/// This wraps [`RemoteCompactionServer`] and automatically registers the
13/// structured merge operator resolver on construction.
14pub struct StructuredRemoteCompactionServer {
15    inner: Arc<RemoteCompactionServer>,
16}
17
18impl StructuredRemoteCompactionServer {
19    pub fn new(config: Config) -> Result<Self> {
20        let server = RemoteCompactionServer::new(config)?;
21        server.set_merge_operator_resolver(
22            structured_merge_operator_resolver(),
23            structured_resolvable_operator_ids(),
24        );
25        Ok(Self {
26            inner: Arc::new(server),
27        })
28    }
29
30    pub fn supported_merge_operator_ids(&self) -> Vec<String> {
31        self.inner.supported_merge_operator_ids()
32    }
33
34    pub fn serve(&self, address: &str) -> Result<()> {
35        self.inner.serve(address)
36    }
37
38    pub fn handle_connection(&self, stream: TcpStream) -> Result<()> {
39        self.inner.handle_connection(stream)
40    }
41
42    pub fn inner(&self) -> &RemoteCompactionServer {
43        &self.inner
44    }
45
46    pub fn close(&self) {
47        self.inner.close()
48    }
49}
50
51#[cfg(test)]
52mod tests {
53    use super::*;
54    use crate::StructuredColumnValue;
55    use crate::list::{ListConfig, ListRetainMode};
56    use cobble::VolumeDescriptor;
57    use std::net::TcpListener;
58    use std::thread;
59    use std::time::Duration;
60    use uuid::Uuid;
61
62    #[test]
63    fn test_structured_remote_compaction_server_supported_ids() {
64        let root = format!("/tmp/ds_remote_ids_{}", Uuid::new_v4());
65        let config = Config {
66            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
67            compaction_threads: 1,
68            ..Config::default()
69        };
70        let server = StructuredRemoteCompactionServer::new(config).unwrap();
71        let ids = server.supported_merge_operator_ids();
72        assert!(
73            ids.contains(&"cobble.list.v1".to_string()),
74            "should include cobble.list.v1, got: {:?}",
75            ids
76        );
77        let _ = std::fs::remove_dir_all(&root);
78    }
79
80    #[test]
81    fn test_structured_remote_compaction_with_list_operator() {
82        let root = format!("/tmp/ds_remote_list_{}", Uuid::new_v4());
83
84        // Set up the remote compaction server
85        let server_config = Config {
86            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
87            base_file_size: 128,
88            compaction_threads: 1,
89            num_columns: 2,
90            ..Config::default()
91        };
92        let server = Arc::new(StructuredRemoteCompactionServer::new(server_config).unwrap());
93
94        // Start server in a background thread using serve()
95        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
96        let addr = listener.local_addr().unwrap();
97        drop(listener);
98        let server_clone = Arc::clone(&server);
99        let addr_str = addr.to_string();
100        let _server_thread = thread::spawn(move || {
101            let _ = server_clone.serve(&addr_str);
102        });
103        // Give the server a moment to bind
104        thread::sleep(Duration::from_millis(100));
105
106        // Open a StructuredSingleDb with remote compaction pointing to our server
107        let db_config = Config {
108            volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
109            num_columns: 2,
110            total_buckets: 1,
111            base_file_size: 128,
112            compaction_threads: 1,
113            l0_file_limit: 2,
114            compaction_remote_addr: Some(addr.to_string()),
115            ..Config::default()
116        };
117        let mut db = crate::StructuredSingleDb::open(db_config).unwrap();
118        db.update_schema()
119            .add_list_column(
120                1,
121                ListConfig {
122                    max_elements: Some(5),
123                    retain_mode: ListRetainMode::Last,
124                    preserve_element_ttl: false,
125                },
126            )
127            .commit()
128            .unwrap();
129
130        // Write enough data to trigger flush and eventually compaction
131        for i in 0..30u32 {
132            let key = format!("key{:03}", i % 5);
133            db.put(
134                0,
135                key.as_bytes(),
136                0,
137                bytes::Bytes::from(format!("val{}", i)),
138            )
139            .unwrap();
140            db.merge(
141                0,
142                key.as_bytes(),
143                1,
144                vec![bytes::Bytes::from(format!("elem{}", i))],
145            )
146            .unwrap();
147        }
148
149        // Trigger a snapshot to ensure data is flushed
150        let _snap_id = db.snapshot().unwrap();
151        thread::sleep(Duration::from_millis(500));
152
153        // Read back and verify list merge results are correct
154        for i in 0..5u32 {
155            let key = format!("key{:03}", i);
156            let row = db.get(0, key.as_bytes()).unwrap();
157            assert!(row.is_some(), "key {} should exist", key);
158            let row = row.unwrap();
159            // Column 1 is a list — elements should be present
160            if let Some(StructuredColumnValue::List(elements)) = &row[1] {
161                assert!(
162                    !elements.is_empty(),
163                    "key {} list column should have elements",
164                    key
165                );
166            } else {
167                panic!("key {} column 1 should be a list, got {:?}", key, row[1]);
168            }
169        }
170
171        db.close().unwrap();
172        server.close();
173        let _ = std::fs::remove_dir_all(&root);
174    }
175}