pub struct DiskDriver<T: Clone> { /* private fields */ }Expand description
MST walker that reads from disk instead of an in-memory hashmap
Implementations§
Source§impl<T: Processable + Send + 'static> DiskDriver<T>
impl<T: Processable + Send + 'static> DiskDriver<T>
Sourcepub async fn next_chunk(
&mut self,
n: usize,
) -> Result<Option<BlockChunk<T>>, DriveError>
pub async fn next_chunk( &mut self, n: usize, ) -> Result<Option<BlockChunk<T>>, DriveError>
Walk the MST returning up to n rkey + record pairs
while let Some(pairs) = disk_driver.next_chunk(256).await? {
for (rkey, record) in pairs {
println!("{rkey}: size={}", record.len());
}
}
let store = disk_driver.reset_store().await?;Sourcepub fn to_channel(
self,
n: usize,
) -> (Receiver<Result<BlockChunk<T>, DriveError>>, JoinHandle<Self>)
pub fn to_channel( self, n: usize, ) -> (Receiver<Result<BlockChunk<T>, DriveError>>, JoinHandle<Self>)
Spawn the disk reading task into a tokio blocking thread
The idea is to avoid so much sending back and forth to the blocking
thread, letting a blocking task do all the disk reading work and sending
records and rkeys back through an mpsc channel instead.
This might also allow the disk work to continue while processing the
records. It’s still not yet clear if this method actually has much
benefit over just using .next_chunk(n).
let (mut rx, join) = disk_driver.to_channel(512);
while let Some(recvd) = rx.recv().await {
let pairs = recvd?;
for (rkey, record) in pairs {
println!("{rkey}: size={}", record.len());
}
}
let store = join.await?.reset_store().await?;Sourcepub async fn reset_store(self) -> Result<DiskStore, DriveError>
pub async fn reset_store(self) -> Result<DiskStore, DriveError>
Reset the disk storage so it can be reused. You must call this.
Ideally we’d put this in an impl Drop, but since it makes blocking
calls, that would be risky in an async context. For now you just have to
carefully make sure you call it.
The sqlite store is returned, so it can be reused for another
DiskDriver.