vortex_serde/layouts/read/
context.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use std::fmt::Debug;
use std::sync::Arc;

use ahash::HashMap;
use bytes::Bytes;
use vortex::Context;
use vortex_error::{vortex_err, VortexResult};
use vortex_flatbuffers::footer as fb;
use vortex_flatbuffers::footer::LayoutVariant;

use crate::layouts::read::cache::RelativeLayoutCache;
use crate::layouts::read::layouts::{ChunkedLayoutSpec, ColumnLayoutSpec, FlatLayout};
use crate::layouts::read::{LayoutReader, Scan};

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct LayoutId(pub u16);

pub trait LayoutSpec: Debug + Send + Sync {
    fn id(&self) -> LayoutId;

    fn layout(
        &self,
        fb_bytes: Bytes,
        fb_loc: usize,
        scan: Scan,
        layout_reader: LayoutDeserializer,
        message_cache: RelativeLayoutCache,
    ) -> Box<dyn LayoutReader>;
}

pub type LayoutSpecRef = &'static dyn LayoutSpec;

#[derive(Debug, Clone)]
pub struct LayoutContext {
    layout_refs: HashMap<LayoutId, LayoutSpecRef>,
}

impl LayoutContext {
    pub fn new(layout_refs: HashMap<LayoutId, LayoutSpecRef>) -> Self {
        Self { layout_refs }
    }

    pub fn lookup_layout(&self, id: &LayoutId) -> Option<LayoutSpecRef> {
        self.layout_refs.get(id).cloned()
    }
}

impl Default for LayoutContext {
    fn default() -> Self {
        Self::new(
            [&ColumnLayoutSpec as LayoutSpecRef, &ChunkedLayoutSpec]
                .into_iter()
                .map(|l| (l.id(), l))
                .collect(),
        )
    }
}

#[derive(Default, Debug, Clone)]
pub struct LayoutDeserializer {
    ctx: Arc<Context>,
    layout_ctx: Arc<LayoutContext>,
}

impl LayoutDeserializer {
    pub fn new(ctx: Arc<Context>, layout_ctx: Arc<LayoutContext>) -> Self {
        Self { ctx, layout_ctx }
    }

    pub fn read_layout(
        &self,
        fb_bytes: Bytes,
        fb_loc: usize,
        scan: Scan,
        message_cache: RelativeLayoutCache,
    ) -> VortexResult<Box<dyn LayoutReader>> {
        let fb_layout = unsafe {
            let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
            fb::Layout::init_from_table(tab)
        };

        match fb_layout.layout_type() {
            LayoutVariant::FlatLayout => {
                let flat_layout = fb_layout
                    .layout_as_flat_layout()
                    .ok_or_else(|| vortex_err!("Must be flat layout"))?;
                Ok(Box::new(FlatLayout::new(
                    flat_layout.begin(),
                    flat_layout.end(),
                    self.ctx.clone(),
                    message_cache,
                )))
            }
            LayoutVariant::NestedLayout => {
                let nested_layout = fb_layout
                    .layout_as_nested_layout()
                    .ok_or_else(|| vortex_err!("Must be nested layout"))?;
                Ok(self
                    .layout_ctx
                    .lookup_layout(&LayoutId(nested_layout.encoding()))
                    .ok_or_else(|| {
                        vortex_err!("Unknown layout definition {}", nested_layout.encoding())
                    })?
                    .layout(fb_bytes, fb_loc, scan, self.clone(), message_cache))
            }
            _ => unreachable!("Unknown flatbuffer layout"),
        }
    }
}