1use super::ColumnStore;
2use crate::store::{
3 catalog::ColumnCatalog,
4 descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorPageHeader},
5};
6use arrow::{
7 array::{StringBuilder, UInt64Builder},
8 datatypes::{DataType, Field, Schema},
9 record_batch::RecordBatch,
10 util::pretty,
11};
12use llkv_storage::{
13 constants::CATALOG_ROOT_PKEY,
14 pager::{BatchGet, GetResult, Pager},
15 types::PhysicalKey,
16};
17use std::collections::HashMap;
18use std::sync::Arc;
19
20pub trait ColumnStoreDebug {
22 fn render_storage_as_formatted_string(&self) -> String;
24
25 fn render_storage_as_dot(&self, batch_colors: &HashMap<PhysicalKey, usize>) -> String;
28}
29
30pub fn discover_all_pks<P: Pager>(pager: &P) -> Vec<PhysicalKey> {
32 let mut out = Vec::new();
33 out.push(CATALOG_ROOT_PKEY);
34
35 if let Some(GetResult::Raw {
36 bytes: cat_blob, ..
37 }) = pager
38 .batch_get(&[BatchGet::Raw {
39 key: CATALOG_ROOT_PKEY,
40 }])
41 .unwrap()
42 .pop()
43 {
44 let cat = ColumnCatalog::from_bytes(cat_blob.as_ref()).unwrap();
45 for (_fid, desc_pk) in cat.map.iter() {
46 out.push(*desc_pk);
47
48 let desc_blob = pager
50 .batch_get(&[BatchGet::Raw { key: *desc_pk }])
51 .unwrap()
52 .pop()
53 .and_then(|r| match r {
54 GetResult::Raw { bytes, .. } => Some(bytes),
55 _ => None,
56 })
57 .unwrap();
58 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
59 let mut page_pk = desc.head_page_pk;
60 while page_pk != 0 {
61 out.push(page_pk);
62 let page_blob = pager
63 .batch_get(&[BatchGet::Raw { key: page_pk }])
64 .unwrap()
65 .pop()
66 .and_then(|r| match r {
67 GetResult::Raw { bytes, .. } => Some(bytes),
68 _ => None,
69 })
70 .unwrap();
71 let bytes = page_blob.as_ref();
72 let hdr_sz = DescriptorPageHeader::DISK_SIZE;
73 let hd = DescriptorPageHeader::from_le_bytes(&bytes[..hdr_sz]);
74
75 for i in 0..(hd.entry_count as usize) {
77 let off = hdr_sz + i * ChunkMetadata::DISK_SIZE;
78 let end = off + ChunkMetadata::DISK_SIZE;
79 let meta = ChunkMetadata::from_le_bytes(&bytes[off..end]);
80 out.push(meta.chunk_pk);
81 if meta.value_order_perm_pk != 0 {
82 out.push(meta.value_order_perm_pk);
83 }
84 }
85
86 page_pk = hd.next_page_pk;
87 }
88 }
89 }
90
91 out.sort_unstable();
92 out.dedup();
93 out
94}
95
96fn color_for_batch(b: usize) -> &'static str {
97 match b {
98 0 => "white", 1 => "lightskyblue",
100 2 => "palegreen",
101 3 => "khaki",
102 4 => "lightpink",
103 _ => "lightgray",
104 }
105}
106
107impl<P: Pager> ColumnStoreDebug for ColumnStore<P> {
108 fn render_storage_as_formatted_string(&self) -> String {
109 let mut type_builder = StringBuilder::new();
110 let mut logical_id_builder = StringBuilder::new();
111 let mut pk_builder = UInt64Builder::new();
112 let mut details_builder = StringBuilder::new();
113
114 let schema = Arc::new(Schema::new(vec![
115 Field::new("ObjectType", DataType::Utf8, false),
116 Field::new("LogicalID", DataType::Utf8, true),
117 Field::new("PhysicalKey", DataType::UInt64, false),
118 Field::new("Details", DataType::Utf8, false),
119 ]));
120
121 let catalog = self.catalog.read().unwrap();
122
123 type_builder.append_value("Catalog");
124 logical_id_builder.append_null();
125 pk_builder.append_value(CATALOG_ROOT_PKEY);
126 details_builder.append_value(format!("{} entries", catalog.map.len()));
127
128 for (fid, desc_pk) in catalog.map.iter() {
129 if let Some(GetResult::Raw {
130 bytes: desc_blob, ..
131 }) = self
132 .pager
133 .batch_get(&[BatchGet::Raw { key: *desc_pk }])
134 .ok()
135 .and_then(|mut r| r.pop())
136 {
137 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
138
139 type_builder.append_value(" L Field");
140 logical_id_builder.append_value(format!("{:?}", fid));
141 pk_builder.append_value(*desc_pk);
142 details_builder.append_value(format!(
143 "Rows: {}, Chunks: {}",
144 desc.total_row_count, desc.total_chunk_count
145 ));
146
147 let mut page_pk = desc.head_page_pk;
148 while page_pk != 0 {
149 if let Some(GetResult::Raw {
150 bytes: page_blob, ..
151 }) = self
152 .pager
153 .batch_get(&[BatchGet::Raw { key: page_pk }])
154 .ok()
155 .and_then(|mut r| r.pop())
156 {
157 let bytes = page_blob.as_ref();
158 let hdr_sz = DescriptorPageHeader::DISK_SIZE;
159 let hd = DescriptorPageHeader::from_le_bytes(&bytes[..hdr_sz]);
160
161 type_builder.append_value(" L Page");
162 logical_id_builder.append_null();
163 pk_builder.append_value(page_pk);
164 details_builder.append_value(format!("Entries: {}", hd.entry_count));
165
166 for i in 0..(hd.entry_count as usize) {
167 let off = hdr_sz + i * ChunkMetadata::DISK_SIZE;
168 let end = off + ChunkMetadata::DISK_SIZE;
169 let meta = ChunkMetadata::from_le_bytes(&bytes[off..end]);
170
171 type_builder.append_value(" L Chunk");
172 logical_id_builder.append_null();
173 pk_builder.append_value(meta.chunk_pk);
174 details_builder.append_value(format!(
175 "Rows: {}, Est. Bytes: {}",
176 meta.row_count, meta.serialized_bytes
177 ));
178 }
179 page_pk = hd.next_page_pk;
180 } else {
181 page_pk = 0; }
183 }
184 }
185 }
186
187 let batch = RecordBatch::try_new(
188 schema,
189 vec![
190 Arc::new(type_builder.finish()),
191 Arc::new(logical_id_builder.finish()),
192 Arc::new(pk_builder.finish()),
193 Arc::new(details_builder.finish()),
194 ],
195 )
196 .unwrap();
197
198 pretty::pretty_format_batches(&[batch]).unwrap().to_string()
199 }
200
201 fn render_storage_as_dot(&self, batch_colors: &HashMap<PhysicalKey, usize>) -> String {
202 use std::fmt::Write;
203 let pager = &self.pager;
204 let mut s = String::new();
205
206 writeln!(&mut s, "digraph storage {{").unwrap();
207 writeln!(&mut s, " rankdir=LR;").unwrap();
208 writeln!(&mut s, " node [shape=box, fontname=\"monospace\"];").unwrap();
209
210 let cat_pk = CATALOG_ROOT_PKEY;
211 let cat_color = color_for_batch(*batch_colors.get(&cat_pk).unwrap_or(&0));
212
213 let catalog = self.catalog.read().unwrap();
214 writeln!(
215 &mut s,
216 " n{} [label=\"Catalog pk={} entries={}\" style=filled fillcolor={}];",
217 cat_pk,
218 cat_pk,
219 catalog.map.len(),
220 cat_color
221 )
222 .unwrap();
223
224 for (fid, desc_pk) in catalog.map.iter() {
225 if let Some(GetResult::Raw {
226 bytes: desc_blob, ..
227 }) = pager
228 .batch_get(&[BatchGet::Raw { key: *desc_pk }])
229 .unwrap()
230 .pop()
231 {
232 let desc = ColumnDescriptor::from_le_bytes(desc_blob.as_ref());
233 let dcol = color_for_batch(*batch_colors.get(desc_pk).unwrap_or(&0));
234 writeln!(
235 &mut s,
236 " n{} [label=\"ColumnDescriptor pk={} field={:?} rows={} chunks={}\" style=filled fillcolor={}];",
237 desc_pk, desc_pk, fid, desc.total_row_count, desc.total_chunk_count, dcol
238 )
239 .unwrap();
240 writeln!(&mut s, " n{} -> n{};", cat_pk, desc_pk).unwrap();
241
242 let mut page_pk = desc.head_page_pk;
243 let mut prev_page: Option<PhysicalKey> = None;
244 while page_pk != 0 {
245 let pcol = color_for_batch(*batch_colors.get(&page_pk).unwrap_or(&0));
246 if let Some(GetResult::Raw {
247 bytes: page_blob, ..
248 }) = pager
249 .batch_get(&[BatchGet::Raw { key: page_pk }])
250 .unwrap()
251 .pop()
252 {
253 let bytes = page_blob.as_ref();
254 let hdr_sz = DescriptorPageHeader::DISK_SIZE;
255 let hd = DescriptorPageHeader::from_le_bytes(&bytes[..hdr_sz]);
256
257 writeln!(
258 &mut s,
259 " n{} [label=\"DescPage pk={} entries={}\" style=filled fillcolor={}];",
260 page_pk, page_pk, hd.entry_count, pcol
261 )
262 .unwrap();
263
264 if let Some(ppk) = prev_page {
265 writeln!(&mut s, " n{} -> n{};", ppk, page_pk).unwrap();
266 } else {
267 writeln!(&mut s, " n{} -> n{};", desc_pk, page_pk).unwrap();
268 }
269
270 for i in 0..(hd.entry_count as usize) {
271 let off = hdr_sz + i * ChunkMetadata::DISK_SIZE;
272 let end = off + ChunkMetadata::DISK_SIZE;
273 let meta = ChunkMetadata::from_le_bytes(&bytes[off..end]);
274
275 if let Some(GetResult::Raw { bytes: b, .. }) = pager
276 .batch_get(&[BatchGet::Raw { key: meta.chunk_pk }])
277 .unwrap()
278 .pop()
279 {
280 let len = b.as_ref().len();
281 let col = color_for_batch(
282 *batch_colors.get(&meta.chunk_pk).unwrap_or(&0),
283 );
284 writeln!(
285 &mut s,
286 " n{} [label=\"Data pk={} bytes={}\" style=filled fillcolor={}];",
287 meta.chunk_pk, meta.chunk_pk, len, col
288 )
289 .unwrap();
290 writeln!(&mut s, " n{} -> n{};", page_pk, meta.chunk_pk).unwrap();
291 }
292
293 if meta.value_order_perm_pk != 0
294 && let Some(GetResult::Raw { bytes: b, .. }) = pager
295 .batch_get(&[BatchGet::Raw {
296 key: meta.value_order_perm_pk,
297 }])
298 .unwrap()
299 .pop()
300 {
301 let len = b.as_ref().len();
302 let col = color_for_batch(
303 *batch_colors.get(&meta.value_order_perm_pk).unwrap_or(&0),
304 );
305 writeln!(
306 &mut s,
307 " n{} [label=\"Perm pk={} bytes={}\" style=filled fillcolor={}];",
308 meta.value_order_perm_pk, meta.value_order_perm_pk, len, col
309 )
310 .unwrap();
311 writeln!(
312 &mut s,
313 " n{} -> n{};",
314 page_pk, meta.value_order_perm_pk
315 )
316 .unwrap();
317 }
318 }
319 prev_page = Some(page_pk);
320 page_pk = hd.next_page_pk;
321 } else {
322 page_pk = 0;
323 }
324 }
325 }
326 }
327
328 writeln!(&mut s, " subgraph cluster_legend {{").unwrap();
329 writeln!(&mut s, " label=\"Batch legend\";").unwrap();
330
331 let max_batch = batch_colors.values().max().cloned().unwrap_or(0);
332 for b in 0..=max_batch {
333 writeln!(
334 &mut s,
335 " l{} [label=\"batch {}\" shape=box style=filled fillcolor={}];",
336 b,
337 b,
338 color_for_batch(b)
339 )
340 .unwrap();
341 }
342
343 if max_batch > 0 {
344 let legend_nodes: Vec<String> = (0..=max_batch).map(|b| format!("l{}", b)).collect();
345 writeln!(&mut s, " {} [style=invis];", legend_nodes.join(" -> ")).unwrap();
346 }
347
348 writeln!(&mut s, " }}").unwrap();
349 writeln!(&mut s, "}}").unwrap();
350 s
351 }
352}