use crate::connection::Connection;
use crate::error::Result;
use crate::transaction::Transaction;
use crate::wire::consts::op;
use crate::wire::response::read_response;
use crate::wire::stream::op_packet;
const MAX_SEGMENT: usize = 65_535;
const SEG_EOF: i32 = 2;
const SEGMENT_BUFFER: i32 = 0xffff;
#[derive(Debug)]
pub struct Blob {
handle: i32,
eof: bool,
done: bool,
}
impl Blob {
pub fn handle(&self) -> i32 {
self.handle
}
pub fn is_eof(&self) -> bool {
self.eof
}
pub fn read_segment(&mut self, conn: &mut Connection) -> Result<Vec<u8>> {
if self.eof {
return Ok(Vec::new());
}
let mut w = op_packet(op::GET_SEGMENT);
w.put_i32(self.handle);
w.put_i32(SEGMENT_BUFFER); w.put_bytes(&[]); conn.io().send(&w)?;
let resp = read_response(conn.io())?;
if resp.handle == SEG_EOF {
self.eof = true;
}
Ok(unpack_segments(&resp.data))
}
pub fn read_to_end(&mut self, conn: &mut Connection) -> Result<Vec<u8>> {
let mut out = Vec::new();
loop {
let chunk = self.read_segment(conn)?;
out.extend_from_slice(&chunk);
if self.eof {
break;
}
if chunk.is_empty() {
break;
}
}
Ok(out)
}
pub fn close(mut self, conn: &mut Connection) -> Result<()> {
self.done = true;
let mut w = op_packet(op::CLOSE_BLOB);
w.put_i32(self.handle);
conn.io().send(&w)?;
read_response(conn.io())?;
Ok(())
}
}
impl Drop for Blob {
fn drop(&mut self) {
if !self.done {
crate::warn_unclosed("Blob", self.handle);
}
}
}
#[derive(Debug)]
pub struct BlobWriter {
handle: i32,
blob_id: u64,
done: bool,
}
impl BlobWriter {
pub fn blob_id(&self) -> u64 {
self.blob_id
}
pub fn write(&self, conn: &mut Connection, data: &[u8]) -> Result<()> {
for chunk in data.chunks(MAX_SEGMENT) {
let mut w = op_packet(op::PUT_SEGMENT);
w.put_i32(self.handle);
w.put_i32(chunk.len() as i32); w.put_bytes(chunk); conn.io().send(&w)?;
read_response(conn.io())?;
}
Ok(())
}
pub fn cancel(mut self, conn: &mut Connection) -> Result<()> {
self.done = true;
let mut w = op_packet(op::CANCEL_BLOB);
w.put_i32(self.handle);
conn.io().send(&w)?;
read_response(conn.io())?;
Ok(())
}
pub fn close(mut self, conn: &mut Connection) -> Result<u64> {
self.done = true;
let mut w = op_packet(op::CLOSE_BLOB);
w.put_i32(self.handle);
conn.io().send(&w)?;
read_response(conn.io())?;
Ok(self.blob_id)
}
}
impl Drop for BlobWriter {
fn drop(&mut self) {
if !self.done {
crate::warn_unclosed("BlobWriter", self.handle);
}
}
}
impl Connection {
pub fn open_blob(&mut self, tx: &Transaction, blob_id: u64) -> Result<Blob> {
let mut w = op_packet(op::OPEN_BLOB2);
w.put_bytes(&[]); w.put_i32(tx.handle()); w.put_i64(blob_id as i64); self.io().send(&w)?;
let resp = read_response(self.io())?;
Ok(Blob {
handle: resp.handle,
eof: false,
done: false,
})
}
pub fn create_blob(&mut self, tx: &Transaction) -> Result<BlobWriter> {
let mut w = op_packet(op::CREATE_BLOB2);
w.put_bytes(&[]); w.put_i32(tx.handle());
w.put_i64(0); self.io().send(&w)?;
let resp = read_response(self.io())?;
Ok(BlobWriter {
handle: resp.handle,
blob_id: resp.blob_id,
done: false,
})
}
pub fn write_blob(&mut self, tx: &Transaction, data: &[u8]) -> Result<u64> {
let writer = self.create_blob(tx)?;
if let Err(e) = writer.write(self, data) {
match writer.cancel(self) {
Ok(()) | Err(_) => {}
}
return Err(e);
}
writer.close(self)
}
pub fn read_blob(&mut self, tx: &Transaction, blob_id: u64) -> Result<Vec<u8>> {
let mut blob = self.open_blob(tx, blob_id)?;
let result = blob.read_to_end(self);
let close = blob.close(self);
match (result, close) {
(Ok(data), Ok(())) => Ok(data),
(Err(e), _) => Err(e),
(Ok(_), Err(e)) => Err(e),
}
}
}
fn unpack_segments(data: &[u8]) -> Vec<u8> {
let mut out = Vec::new();
let mut i = 0;
while i + 2 <= data.len() {
let len = u16::from_le_bytes([data[i], data[i + 1]]) as usize;
i += 2;
let end = (i + len).min(data.len());
out.extend_from_slice(&data[i..end]);
i = end;
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn unpacks_packed_segments() {
let buf = [2, 0, b'H', b'i', 5, 0, b't', b'h', b'e', b'r', b'e'];
assert_eq!(unpack_segments(&buf), b"Hithere");
}
#[test]
fn unpacks_empty_buffer() {
assert!(unpack_segments(&[]).is_empty());
}
}