cobble_data_structure/
structured_remote_compaction_server.rs1use cobble::{Config, RemoteCompactionServer, Result};
2use std::net::TcpStream;
3use std::sync::Arc;
4
5use crate::structured_db::{
6 structured_merge_operator_resolver, structured_resolvable_operator_ids,
7};
8
9pub struct StructuredRemoteCompactionServer {
15 inner: Arc<RemoteCompactionServer>,
16}
17
18impl StructuredRemoteCompactionServer {
19 pub fn new(config: Config) -> Result<Self> {
20 let server = RemoteCompactionServer::new(config)?;
21 server.set_merge_operator_resolver(
22 structured_merge_operator_resolver(),
23 structured_resolvable_operator_ids(),
24 );
25 Ok(Self {
26 inner: Arc::new(server),
27 })
28 }
29
30 pub fn supported_merge_operator_ids(&self) -> Vec<String> {
31 self.inner.supported_merge_operator_ids()
32 }
33
34 pub fn serve(&self, address: &str) -> Result<()> {
35 self.inner.serve(address)
36 }
37
38 pub fn handle_connection(&self, stream: TcpStream) -> Result<()> {
39 self.inner.handle_connection(stream)
40 }
41
42 pub fn inner(&self) -> &RemoteCompactionServer {
43 &self.inner
44 }
45
46 pub fn close(&self) {
47 self.inner.close()
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use super::*;
54 use crate::StructuredColumnValue;
55 use crate::list::{ListConfig, ListRetainMode};
56 use cobble::VolumeDescriptor;
57 use std::net::TcpListener;
58 use std::thread;
59 use std::time::Duration;
60 use uuid::Uuid;
61
62 #[test]
63 fn test_structured_remote_compaction_server_supported_ids() {
64 let root = format!("/tmp/ds_remote_ids_{}", Uuid::new_v4());
65 let config = Config {
66 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
67 compaction_threads: 1,
68 ..Config::default()
69 };
70 let server = StructuredRemoteCompactionServer::new(config).unwrap();
71 let ids = server.supported_merge_operator_ids();
72 assert!(
73 ids.contains(&"cobble.list.v1".to_string()),
74 "should include cobble.list.v1, got: {:?}",
75 ids
76 );
77 let _ = std::fs::remove_dir_all(&root);
78 }
79
80 #[test]
81 fn test_structured_remote_compaction_with_list_operator() {
82 let root = format!("/tmp/ds_remote_list_{}", Uuid::new_v4());
83
84 let server_config = Config {
86 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
87 base_file_size: 128,
88 compaction_threads: 1,
89 num_columns: 2,
90 ..Config::default()
91 };
92 let server = Arc::new(StructuredRemoteCompactionServer::new(server_config).unwrap());
93
94 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
96 let addr = listener.local_addr().unwrap();
97 drop(listener);
98 let server_clone = Arc::clone(&server);
99 let addr_str = addr.to_string();
100 let _server_thread = thread::spawn(move || {
101 let _ = server_clone.serve(&addr_str);
102 });
103 thread::sleep(Duration::from_millis(100));
105
106 let db_config = Config {
108 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
109 num_columns: 2,
110 total_buckets: 1,
111 base_file_size: 128,
112 compaction_threads: 1,
113 l0_file_limit: 2,
114 compaction_remote_addr: Some(addr.to_string()),
115 ..Config::default()
116 };
117 let mut db = crate::StructuredSingleDb::open(db_config).unwrap();
118 db.update_schema()
119 .add_list_column(
120 1,
121 ListConfig {
122 max_elements: Some(5),
123 retain_mode: ListRetainMode::Last,
124 preserve_element_ttl: false,
125 },
126 )
127 .commit()
128 .unwrap();
129
130 for i in 0..30u32 {
132 let key = format!("key{:03}", i % 5);
133 db.put(
134 0,
135 key.as_bytes(),
136 0,
137 bytes::Bytes::from(format!("val{}", i)),
138 )
139 .unwrap();
140 db.merge(
141 0,
142 key.as_bytes(),
143 1,
144 vec![bytes::Bytes::from(format!("elem{}", i))],
145 )
146 .unwrap();
147 }
148
149 let _snap_id = db.snapshot().unwrap();
151 thread::sleep(Duration::from_millis(500));
152
153 for i in 0..5u32 {
155 let key = format!("key{:03}", i);
156 let row = db.get(0, key.as_bytes()).unwrap();
157 assert!(row.is_some(), "key {} should exist", key);
158 let row = row.unwrap();
159 if let Some(StructuredColumnValue::List(elements)) = &row[1] {
161 assert!(
162 !elements.is_empty(),
163 "key {} list column should have elements",
164 key
165 );
166 } else {
167 panic!("key {} column 1 should be a list, got {:?}", key, row[1]);
168 }
169 }
170
171 db.close().unwrap();
172 server.close();
173 let _ = std::fs::remove_dir_all(&root);
174 }
175}