cobble_data_structure/
structured_remote_compaction_server.rs1use cobble::{Config, RemoteCompactionServer, Result};
2use std::net::TcpStream;
3use std::sync::Arc;
4
5use crate::structured_db::{
6 structured_merge_operator_resolver, structured_resolvable_operator_ids,
7};
8
9pub struct StructuredRemoteCompactionServer {
15 inner: Arc<RemoteCompactionServer>,
16}
17
18impl StructuredRemoteCompactionServer {
19 pub fn new(config: Config) -> Result<Self> {
20 let server = RemoteCompactionServer::new(config)?;
21 server.set_merge_operator_resolver(
22 structured_merge_operator_resolver(),
23 structured_resolvable_operator_ids(),
24 );
25 Ok(Self {
26 inner: Arc::new(server),
27 })
28 }
29
30 pub fn supported_merge_operator_ids(&self) -> Vec<String> {
31 self.inner.supported_merge_operator_ids()
32 }
33
34 pub fn serve(&self, address: &str) -> Result<()> {
35 self.inner.serve(address)
36 }
37
38 pub fn handle_connection(&self, stream: TcpStream) -> Result<()> {
39 self.inner.handle_connection(stream)
40 }
41
42 pub fn inner(&self) -> &RemoteCompactionServer {
43 &self.inner
44 }
45
46 pub fn close(&self) {
47 self.inner.close()
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use super::*;
54 use crate::StructuredColumnValue;
55 use crate::list::{ListConfig, ListRetainMode};
56 use cobble::VolumeDescriptor;
57 use size::Size;
58 use std::net::TcpListener;
59 use std::thread;
60 use std::time::Duration;
61 use uuid::Uuid;
62
63 #[test]
64 fn test_structured_remote_compaction_server_supported_ids() {
65 let root = format!("/tmp/ds_remote_ids_{}", Uuid::new_v4());
66 let config = Config {
67 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
68 compaction_threads: 1,
69 ..Config::default()
70 };
71 let server = StructuredRemoteCompactionServer::new(config).unwrap();
72 let ids = server.supported_merge_operator_ids();
73 assert!(
74 ids.contains(&"cobble.list.v1".to_string()),
75 "should include cobble.list.v1, got: {:?}",
76 ids
77 );
78 let _ = std::fs::remove_dir_all(&root);
79 }
80
81 #[test]
82 fn test_structured_remote_compaction_with_list_operator() {
83 let root = format!("/tmp/ds_remote_list_{}", Uuid::new_v4());
84
85 let server_config = Config {
87 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
88 base_file_size: Size::from_const(128),
89 compaction_threads: 1,
90 num_columns: 2,
91 ..Config::default()
92 };
93 let server = Arc::new(StructuredRemoteCompactionServer::new(server_config).unwrap());
94
95 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
97 let addr = listener.local_addr().unwrap();
98 drop(listener);
99 let server_clone = Arc::clone(&server);
100 let addr_str = addr.to_string();
101 let _server_thread = thread::spawn(move || {
102 let _ = server_clone.serve(&addr_str);
103 });
104 thread::sleep(Duration::from_millis(100));
106
107 let db_config = Config {
109 volumes: VolumeDescriptor::single_volume(format!("file://{}", root)),
110 num_columns: 2,
111 total_buckets: 1,
112 base_file_size: Size::from_const(128),
113 compaction_threads: 1,
114 l0_file_limit: 2,
115 compaction_remote_addr: Some(addr.to_string()),
116 ..Config::default()
117 };
118 let mut db = crate::StructuredSingleDb::open(db_config).unwrap();
119 db.update_schema()
120 .add_list_column(
121 1,
122 ListConfig {
123 max_elements: Some(5),
124 retain_mode: ListRetainMode::Last,
125 preserve_element_ttl: false,
126 },
127 )
128 .commit()
129 .unwrap();
130
131 for i in 0..30u32 {
133 let key = format!("key{:03}", i % 5);
134 db.put(
135 0,
136 key.as_bytes(),
137 0,
138 bytes::Bytes::from(format!("val{}", i)),
139 )
140 .unwrap();
141 db.merge(
142 0,
143 key.as_bytes(),
144 1,
145 vec![bytes::Bytes::from(format!("elem{}", i))],
146 )
147 .unwrap();
148 }
149
150 let _snap_id = db.snapshot().unwrap();
152 thread::sleep(Duration::from_millis(500));
153
154 for i in 0..5u32 {
156 let key = format!("key{:03}", i);
157 let row = db.get(0, key.as_bytes()).unwrap();
158 assert!(row.is_some(), "key {} should exist", key);
159 let row = row.unwrap();
160 if let Some(StructuredColumnValue::List(elements)) = &row[1] {
162 assert!(
163 !elements.is_empty(),
164 "key {} list column should have elements",
165 key
166 );
167 } else {
168 panic!("key {} column 1 should be a list, got {:?}", key, row[1]);
169 }
170 }
171
172 db.close().unwrap();
173 server.close();
174 let _ = std::fs::remove_dir_all(&root);
175 }
176}