use std::cell::RefCell;
use std::io::prelude::*;
use std::io::Cursor;
use std::io::SeekFrom;
use num_traits::ToPrimitive;
use super::os;
use crate::function::{OptionalArg, PyFuncArgs};
use crate::obj::objbytearray::PyByteArray;
use crate::obj::objbytes;
use crate::obj::objbytes::PyBytes;
use crate::obj::objint::{self, PyIntRef};
use crate::obj::objstr::{self, PyStringRef};
use crate::obj::objtype;
use crate::obj::objtype::PyClassRef;
use crate::pyobject::TypeProtocol;
use crate::pyobject::{BufferProtocol, Either, PyObjectRef, PyRef, PyResult, PyValue};
use crate::vm::VirtualMachine;
fn byte_count(bytes: OptionalArg<Option<PyObjectRef>>) -> i64 {
match bytes {
OptionalArg::Present(Some(ref int)) => objint::get_value(int).to_i64().unwrap(),
_ => (-1 as i64),
}
}
#[derive(Debug)]
struct BufferedIO {
cursor: Cursor<Vec<u8>>,
}
impl BufferedIO {
fn new(cursor: Cursor<Vec<u8>>) -> BufferedIO {
BufferedIO { cursor }
}
fn write(&mut self, data: &[u8]) -> Option<u64> {
let length = data.len();
match self.cursor.write_all(data) {
Ok(_) => Some(length as u64),
Err(_) => None,
}
}
fn getvalue(&self) -> Vec<u8> {
self.cursor.clone().into_inner()
}
fn seek(&mut self, offset: u64) -> Option<u64> {
match self.cursor.seek(SeekFrom::Start(offset)) {
Ok(_) => Some(offset),
Err(_) => None,
}
}
fn read(&mut self, bytes: i64) -> Option<Vec<u8>> {
let mut buffer = Vec::new();
if bytes > 0 {
let mut handle = self.cursor.clone().take(bytes as u64);
if handle.read_to_end(&mut buffer).is_err() {
return None;
}
self.cursor = handle.into_inner();
} else {
if self.cursor.read_to_end(&mut buffer).is_err() {
return None;
}
};
Some(buffer)
}
}
#[derive(Debug)]
struct PyStringIO {
buffer: RefCell<BufferedIO>,
}
type PyStringIORef = PyRef<PyStringIO>;
impl PyValue for PyStringIO {
fn class(vm: &VirtualMachine) -> PyClassRef {
vm.class("io", "StringIO")
}
}
impl PyStringIORef {
fn write(self, data: PyStringRef, vm: &VirtualMachine) -> PyResult {
let bytes = data.as_str().as_bytes();
match self.buffer.borrow_mut().write(bytes) {
Some(value) => Ok(vm.ctx.new_int(value)),
None => Err(vm.new_type_error("Error Writing String".to_string())),
}
}
fn getvalue(self, vm: &VirtualMachine) -> PyResult {
match String::from_utf8(self.buffer.borrow().getvalue()) {
Ok(result) => Ok(vm.ctx.new_str(result)),
Err(_) => Err(vm.new_value_error("Error Retrieving Value".to_string())),
}
}
fn seek(self, offset: PyObjectRef, vm: &VirtualMachine) -> PyResult {
let position = objint::get_value(&offset).to_u64().unwrap();
match self.buffer.borrow_mut().seek(position) {
Some(value) => Ok(vm.ctx.new_int(value)),
None => Err(vm.new_value_error("Error Performing Operation".to_string())),
}
}
fn seekable(self, _vm: &VirtualMachine) -> bool {
true
}
fn read(self, bytes: OptionalArg<Option<PyObjectRef>>, vm: &VirtualMachine) -> PyResult {
let data = match self.buffer.borrow_mut().read(byte_count(bytes)) {
Some(value) => value,
None => Vec::new(),
};
match String::from_utf8(data) {
Ok(value) => Ok(vm.ctx.new_str(value)),
Err(_) => Err(vm.new_value_error("Error Retrieving Value".to_string())),
}
}
}
fn string_io_new(
cls: PyClassRef,
object: OptionalArg<Option<PyObjectRef>>,
vm: &VirtualMachine,
) -> PyResult<PyStringIORef> {
let raw_string = match object {
OptionalArg::Present(Some(ref input)) => objstr::get_value(input),
_ => String::new(),
};
PyStringIO {
buffer: RefCell::new(BufferedIO::new(Cursor::new(raw_string.into_bytes()))),
}
.into_ref_with_type(vm, cls)
}
#[derive(Debug)]
struct PyBytesIO {
buffer: RefCell<BufferedIO>,
}
type PyBytesIORef = PyRef<PyBytesIO>;
impl PyValue for PyBytesIO {
fn class(vm: &VirtualMachine) -> PyClassRef {
vm.class("io", "BytesIO")
}
}
impl PyBytesIORef {
fn write(self, data: objbytes::PyBytesRef, vm: &VirtualMachine) -> PyResult {
let bytes = data.get_value();
match self.buffer.borrow_mut().write(bytes) {
Some(value) => Ok(vm.ctx.new_int(value)),
None => Err(vm.new_type_error("Error Writing Bytes".to_string())),
}
}
fn getvalue(self, vm: &VirtualMachine) -> PyResult {
Ok(vm.ctx.new_bytes(self.buffer.borrow().getvalue()))
}
fn read(self, bytes: OptionalArg<Option<PyObjectRef>>, vm: &VirtualMachine) -> PyResult {
match self.buffer.borrow_mut().read(byte_count(bytes)) {
Some(value) => Ok(vm.ctx.new_bytes(value)),
None => Err(vm.new_value_error("Error Retrieving Value".to_string())),
}
}
fn seek(self, offset: PyObjectRef, vm: &VirtualMachine) -> PyResult {
let position = objint::get_value(&offset).to_u64().unwrap();
match self.buffer.borrow_mut().seek(position) {
Some(value) => Ok(vm.ctx.new_int(value)),
None => Err(vm.new_value_error("Error Performing Operation".to_string())),
}
}
fn seekable(self, _vm: &VirtualMachine) -> bool {
true
}
}
fn bytes_io_new(
cls: PyClassRef,
object: OptionalArg<Option<PyObjectRef>>,
vm: &VirtualMachine,
) -> PyResult<PyBytesIORef> {
let raw_bytes = match object {
OptionalArg::Present(Some(ref input)) => objbytes::get_value(input).to_vec(),
_ => vec![],
};
PyBytesIO {
buffer: RefCell::new(BufferedIO::new(Cursor::new(raw_bytes))),
}
.into_ref_with_type(vm, cls)
}
fn io_base_cm_enter(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(instance, None)]);
Ok(instance.clone())
}
fn io_base_cm_exit(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(
vm,
args,
required = [
(_instance, None),
(_exception_type, None),
(_exception_value, None),
(_traceback, None)
]
);
Ok(vm.get_none())
}
fn io_base_flush(_self: PyObjectRef, _vm: &VirtualMachine) {}
fn io_base_seekable(vm: &VirtualMachine, _args: PyFuncArgs) -> PyResult {
Ok(vm.ctx.new_bool(false))
}
fn buffered_io_base_init(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(buffered, None), (raw, None)]);
vm.set_attr(buffered, "raw", raw.clone())?;
Ok(vm.get_none())
}
fn buffered_reader_read(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(buffered, None)]);
let buff_size = 8 * 1024;
let buffer = vm.ctx.new_bytearray(vec![0; buff_size]);
let mut result = vec![];
let mut length = buff_size;
let raw = vm.get_attribute(buffered.clone(), "raw").unwrap();
while length == buff_size {
vm.call_method(&raw, "readinto", vec![buffer.clone()])
.map_err(|_| vm.new_value_error("IO Error".to_string()))?;
if let Some(bytes) = buffer.payload::<PyByteArray>() {
result.extend_from_slice(&bytes.inner.borrow().elements);
};
let py_len = vm.call_method(&buffer, "__len__", PyFuncArgs::default())?;
length = objint::get_value(&py_len).to_usize().unwrap();
}
Ok(vm.ctx.new_bytes(result))
}
fn buffered_reader_seekable(vm: &VirtualMachine, _args: PyFuncArgs) -> PyResult {
Ok(vm.ctx.new_bool(true))
}
fn compute_c_flag(mode: &str) -> u32 {
let flag = match mode.chars().next() {
Some(mode) => match mode {
'w' => libc::O_WRONLY | libc::O_CREAT,
'x' => libc::O_WRONLY | libc::O_CREAT | libc::O_EXCL,
'a' => libc::O_APPEND,
'+' => libc::O_RDWR,
_ => libc::O_RDONLY,
},
None => libc::O_RDONLY,
};
flag as u32
}
fn file_io_init(
file_io: PyObjectRef,
name: Either<PyStringRef, PyIntRef>,
mode: OptionalArg<PyStringRef>,
vm: &VirtualMachine,
) -> PyResult {
let file_no = match &name {
Either::A(name) => {
let mode = match mode {
OptionalArg::Present(mode) => compute_c_flag(mode.as_str()),
OptionalArg::Missing => libc::O_RDONLY as _,
};
let fno = os::os_open(
name.clone(),
mode as _,
OptionalArg::Missing,
OptionalArg::Missing,
vm,
)?;
vm.new_int(fno)
}
Either::B(fno) => fno.clone().into_object(),
};
vm.set_attr(&file_io, "name", name.into_object())?;
vm.set_attr(&file_io, "fileno", file_no)?;
vm.set_attr(&file_io, "closefd", vm.new_bool(false))?;
vm.set_attr(&file_io, "closed", vm.new_bool(false))?;
Ok(vm.get_none())
}
fn file_io_read(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(
vm,
args,
required = [(file_io, None)],
optional = [(read_byte, Some(vm.ctx.int_type()))]
);
let file_no = vm.get_attribute(file_io.clone(), "fileno")?;
let raw_fd = objint::get_value(&file_no).to_i64().unwrap();
let mut handle = os::rust_file(raw_fd);
let bytes = match read_byte {
None => {
let mut bytes = vec![];
handle
.read_to_end(&mut bytes)
.map_err(|_| vm.new_value_error("Error reading from Buffer".to_string()))?;
bytes
}
Some(read_byte) => {
let mut bytes = vec![0; objint::get_value(&read_byte).to_usize().unwrap()];
handle
.read_exact(&mut bytes)
.map_err(|_| vm.new_value_error("Error reading from Buffer".to_string()))?;
let updated = os::raw_file_number(handle);
vm.set_attr(file_io, "fileno", vm.ctx.new_int(updated))?;
bytes
}
};
Ok(vm.ctx.new_bytes(bytes))
}
fn file_io_readinto(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(file_io, None), (obj, None)]);
if !obj.readonly() {
return Ok(vm.new_type_error(
"readinto() argument must be read-write bytes-like object".to_string(),
));
}
let py_length = vm.call_method(obj, "__len__", PyFuncArgs::default())?;
let length = objint::get_value(&py_length).to_u64().unwrap();
let file_no = vm.get_attribute(file_io.clone(), "fileno")?;
let raw_fd = objint::get_value(&file_no).to_i64().unwrap();
let handle = os::rust_file(raw_fd);
let mut f = handle.take(length);
if let Some(bytes) = obj.payload::<PyByteArray>() {
let value_mut = &mut bytes.inner.borrow_mut().elements;
value_mut.clear();
match f.read_to_end(value_mut) {
Ok(_) => {}
Err(_) => return Err(vm.new_value_error("Error reading from Take".to_string())),
}
};
let updated = os::raw_file_number(f.into_inner());
vm.set_attr(file_io, "fileno", vm.ctx.new_int(updated))?;
Ok(vm.get_none())
}
fn file_io_write(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(file_io, None), (obj, None)]);
let file_no = vm.get_attribute(file_io.clone(), "fileno")?;
let raw_fd = objint::get_value(&file_no).to_i64().unwrap();
let mut handle = os::rust_file(raw_fd);
let bytes = match_class!(match obj.clone() {
i @ PyBytes => Ok(i.get_value().to_vec()),
j @ PyByteArray => Ok(j.inner.borrow().elements.to_vec()),
obj => Err(vm.new_type_error(format!(
"a bytes-like object is required, not {}",
obj.class()
))),
});
match handle.write(&bytes?) {
Ok(len) => {
let updated = os::raw_file_number(handle);
vm.set_attr(file_io, "fileno", vm.ctx.new_int(updated))?;
Ok(vm.ctx.new_int(len))
}
Err(_) => Err(vm.new_value_error("Error Writing Bytes to Handle".to_string())),
}
}
#[cfg(windows)]
fn file_io_close(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
use std::os::windows::io::IntoRawHandle;
arg_check!(vm, args, required = [(file_io, None)]);
let file_no = vm.get_attribute(file_io.clone(), "fileno")?;
let raw_fd = objint::get_value(&file_no).to_i64().unwrap();
let handle = os::rust_file(raw_fd);
let raw_handle = handle.into_raw_handle();
unsafe {
kernel32::CloseHandle(raw_handle);
}
vm.set_attr(file_io, "closefd", vm.new_bool(true))?;
vm.set_attr(file_io, "closed", vm.new_bool(true))?;
Ok(vm.ctx.none())
}
#[cfg(unix)]
fn file_io_close(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(file_io, None)]);
let file_no = vm.get_attribute(file_io.clone(), "fileno")?;
let raw_fd = objint::get_value(&file_no).to_i32().unwrap();
unsafe {
libc::close(raw_fd);
}
vm.set_attr(file_io, "closefd", vm.new_bool(true))?;
vm.set_attr(file_io, "closed", vm.new_bool(true))?;
Ok(vm.ctx.none())
}
fn file_io_seekable(vm: &VirtualMachine, _args: PyFuncArgs) -> PyResult {
Ok(vm.ctx.new_bool(true))
}
fn buffered_writer_write(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(
vm,
args,
required = [(buffered, None), (obj, Some(vm.ctx.bytes_type()))]
);
let raw = vm.get_attribute(buffered.clone(), "raw").unwrap();
vm.call_method(&raw, "write", vec![obj.clone()])
}
fn buffered_writer_seekable(vm: &VirtualMachine, _args: PyFuncArgs) -> PyResult {
Ok(vm.ctx.new_bool(true))
}
fn text_io_wrapper_init(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(
vm,
args,
required = [(text_io_wrapper, None), (buffer, None)]
);
vm.set_attr(text_io_wrapper, "buffer", buffer.clone())?;
Ok(vm.get_none())
}
fn text_io_wrapper_seekable(vm: &VirtualMachine, _args: PyFuncArgs) -> PyResult {
Ok(vm.new_bool(true))
}
fn text_io_base_read(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(vm, args, required = [(text_io_base, None)]);
let buffered_reader_class = vm.try_class("_io", "BufferedReader")?;
let raw = vm.get_attribute(text_io_base.clone(), "buffer").unwrap();
if !objtype::isinstance(&raw, &buffered_reader_class) {
return Err(vm.new_value_error("not readable".to_string()));
}
if let Ok(bytes) = vm.call_method(&raw, "read", PyFuncArgs::default()) {
let value = objbytes::get_value(&bytes).to_vec();
let rust_string = String::from_utf8(value).map_err(|e| {
vm.new_unicode_decode_error(format!(
"cannot decode byte at index: {}",
e.utf8_error().valid_up_to()
))
})?;
Ok(vm.ctx.new_str(rust_string))
} else {
Err(vm.new_value_error("Error unpacking Bytes".to_string()))
}
}
fn text_io_base_write(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
use std::str::from_utf8;
arg_check!(
vm,
args,
required = [(text_io_base, None), (obj, Some(vm.ctx.str_type()))]
);
let buffered_writer_class = vm.try_class("_io", "BufferedWriter")?;
let raw = vm.get_attribute(text_io_base.clone(), "buffer").unwrap();
if !objtype::isinstance(&raw, &buffered_writer_class) {
return Err(vm.new_value_error("not writable".to_string()));
}
let bytes = objstr::get_value(obj).into_bytes();
let len = vm.call_method(&raw, "write", vec![vm.ctx.new_bytes(bytes.clone())])?;
let len = objint::get_value(&len).to_usize().ok_or_else(|| {
vm.new_overflow_error("int to large to convert to Rust usize".to_string())
})?;
let len = from_utf8(&bytes[..len])
.unwrap_or_else(|e| from_utf8(&bytes[..e.valid_up_to()]).unwrap())
.chars()
.count();
Ok(vm.ctx.new_int(len))
}
fn split_mode_string(mode_string: String) -> Result<(String, String), String> {
let mut mode: char = '\0';
let mut typ: char = '\0';
let mut plus_is_set = false;
for ch in mode_string.chars() {
match ch {
'+' => {
if plus_is_set {
return Err(format!("invalid mode: '{}'", mode_string));
}
plus_is_set = true;
}
't' | 'b' => {
if typ != '\0' {
if typ == ch {
return Err(format!("invalid mode: '{}'", mode_string));
} else {
return Err("can't have text and binary mode at once".to_string());
}
}
typ = ch;
}
'a' | 'r' | 'w' => {
if mode != '\0' {
if mode == ch {
return Err(format!("invalid mode: '{}'", mode_string));
} else {
return Err(
"must have exactly one of create/read/write/append mode".to_string()
);
}
}
mode = ch;
}
_ => return Err(format!("invalid mode: '{}'", mode_string)),
}
}
if mode == '\0' {
return Err(
"Must have exactly one of create/read/write/append mode and at most one plus"
.to_string(),
);
}
let mut mode = mode.to_string();
if plus_is_set {
mode.push('+');
}
if typ == '\0' {
typ = 't';
}
Ok((mode, typ.to_string()))
}
pub fn io_open(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
arg_check!(
vm,
args,
required = [(file, None)],
optional = [(mode, Some(vm.ctx.str_type()))]
);
let mode_string = mode.map_or("rt".to_string(), objstr::get_value);
let (mode, typ) = match split_mode_string(mode_string) {
Ok((mode, typ)) => (mode, typ),
Err(error_message) => {
return Err(vm.new_value_error(error_message));
}
};
let io_module = vm.import("_io", &[], 0)?;
let file_io_class = vm.get_attribute(io_module.clone(), "FileIO").unwrap();
let file_io_obj = vm.invoke(
&file_io_class,
vec![file.clone(), vm.ctx.new_str(mode.clone())],
)?;
let buffered = match mode.chars().next().unwrap() {
'w' => {
let buffered_writer_class = vm
.get_attribute(io_module.clone(), "BufferedWriter")
.unwrap();
vm.invoke(&buffered_writer_class, vec![file_io_obj.clone()])
}
'r' => {
let buffered_reader_class = vm
.get_attribute(io_module.clone(), "BufferedReader")
.unwrap();
vm.invoke(&buffered_reader_class, vec![file_io_obj.clone()])
}
_ => unimplemented!("'a' mode is not yet implemented"),
};
let io_obj = match typ.chars().next().unwrap() {
't' => {
let text_io_wrapper_class = vm.get_attribute(io_module, "TextIOWrapper").unwrap();
vm.invoke(&text_io_wrapper_class, vec![buffered.unwrap()])
}
'b' => buffered,
_ => unreachable!(),
};
io_obj
}
pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
let ctx = &vm.ctx;
let io_base = py_class!(ctx, "IOBase", ctx.object(), {
"__enter__" => ctx.new_rustfunc(io_base_cm_enter),
"__exit__" => ctx.new_rustfunc(io_base_cm_exit),
"seekable" => ctx.new_rustfunc(io_base_seekable),
"flush" => ctx.new_rustfunc(io_base_flush)
});
let raw_io_base = py_class!(ctx, "RawIOBase", io_base.clone(), {});
let buffered_io_base = py_class!(ctx, "BufferedIOBase", io_base.clone(), {});
let text_io_base = py_class!(ctx, "TextIOBase", io_base.clone(), {
"read" => ctx.new_rustfunc(text_io_base_read),
"write" => ctx.new_rustfunc(text_io_base_write)
});
let file_io = py_class!(ctx, "FileIO", raw_io_base.clone(), {
"__init__" => ctx.new_rustfunc(file_io_init),
"name" => ctx.str_type(),
"read" => ctx.new_rustfunc(file_io_read),
"readinto" => ctx.new_rustfunc(file_io_readinto),
"write" => ctx.new_rustfunc(file_io_write),
"close" => ctx.new_rustfunc(file_io_close),
"seekable" => ctx.new_rustfunc(file_io_seekable)
});
let buffered_reader = py_class!(ctx, "BufferedReader", buffered_io_base.clone(), {
"__init__" => ctx.new_rustfunc(buffered_io_base_init),
"read" => ctx.new_rustfunc(buffered_reader_read),
"seekable" => ctx.new_rustfunc(buffered_reader_seekable)
});
let buffered_writer = py_class!(ctx, "BufferedWriter", buffered_io_base.clone(), {
"__init__" => ctx.new_rustfunc(buffered_io_base_init),
"write" => ctx.new_rustfunc(buffered_writer_write),
"seekable" => ctx.new_rustfunc(buffered_writer_seekable)
});
let text_io_wrapper = py_class!(ctx, "TextIOWrapper", text_io_base.clone(), {
"__init__" => ctx.new_rustfunc(text_io_wrapper_init),
"seekable" => ctx.new_rustfunc(text_io_wrapper_seekable)
});
let string_io = py_class!(ctx, "StringIO", text_io_base.clone(), {
"__new__" => ctx.new_rustfunc(string_io_new),
"seek" => ctx.new_rustfunc(PyStringIORef::seek),
"seekable" => ctx.new_rustfunc(PyStringIORef::seekable),
"read" => ctx.new_rustfunc(PyStringIORef::read),
"write" => ctx.new_rustfunc(PyStringIORef::write),
"getvalue" => ctx.new_rustfunc(PyStringIORef::getvalue)
});
let bytes_io = py_class!(ctx, "BytesIO", buffered_io_base.clone(), {
"__new__" => ctx.new_rustfunc(bytes_io_new),
"read" => ctx.new_rustfunc(PyBytesIORef::read),
"read1" => ctx.new_rustfunc(PyBytesIORef::read),
"seek" => ctx.new_rustfunc(PyBytesIORef::seek),
"seekable" => ctx.new_rustfunc(PyBytesIORef::seekable),
"write" => ctx.new_rustfunc(PyBytesIORef::write),
"getvalue" => ctx.new_rustfunc(PyBytesIORef::getvalue)
});
py_module!(vm, "_io", {
"open" => ctx.new_rustfunc(io_open),
"IOBase" => io_base,
"RawIOBase" => raw_io_base,
"BufferedIOBase" => buffered_io_base,
"TextIOBase" => text_io_base,
"FileIO" => file_io,
"BufferedReader" => buffered_reader,
"BufferedWriter" => buffered_writer,
"TextIOWrapper" => text_io_wrapper,
"StringIO" => string_io,
"BytesIO" => bytes_io,
"DEFAULT_BUFFER_SIZE" => ctx.new_int(8 * 1024),
})
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_mode_split_into(mode_string: &str, expected_mode: &str, expected_typ: &str) {
let (mode, typ) = split_mode_string(mode_string.to_string()).unwrap();
assert_eq!(mode, expected_mode);
assert_eq!(typ, expected_typ);
}
#[test]
fn test_split_mode_valid_cases() {
assert_mode_split_into("r", "r", "t");
assert_mode_split_into("rb", "r", "b");
assert_mode_split_into("rt", "r", "t");
assert_mode_split_into("r+t", "r+", "t");
assert_mode_split_into("w+t", "w+", "t");
assert_mode_split_into("r+b", "r+", "b");
assert_mode_split_into("w+b", "w+", "b");
}
#[test]
fn test_invalid_mode() {
assert_eq!(
split_mode_string("rbsss".to_string()),
Err("invalid mode: 'rbsss'".to_string())
);
assert_eq!(
split_mode_string("rrb".to_string()),
Err("invalid mode: 'rrb'".to_string())
);
assert_eq!(
split_mode_string("rbb".to_string()),
Err("invalid mode: 'rbb'".to_string())
);
}
#[test]
fn test_mode_not_specified() {
assert_eq!(
split_mode_string("".to_string()),
Err(
"Must have exactly one of create/read/write/append mode and at most one plus"
.to_string()
)
);
assert_eq!(
split_mode_string("b".to_string()),
Err(
"Must have exactly one of create/read/write/append mode and at most one plus"
.to_string()
)
);
assert_eq!(
split_mode_string("t".to_string()),
Err(
"Must have exactly one of create/read/write/append mode and at most one plus"
.to_string()
)
);
}
#[test]
fn test_text_and_binary_at_once() {
assert_eq!(
split_mode_string("rbt".to_string()),
Err("can't have text and binary mode at once".to_string())
);
}
#[test]
fn test_exactly_one_mode() {
assert_eq!(
split_mode_string("rwb".to_string()),
Err("must have exactly one of create/read/write/append mode".to_string())
);
}
#[test]
fn test_at_most_one_plus() {
assert_eq!(
split_mode_string("a++".to_string()),
Err("invalid mode: 'a++'".to_string())
);
}
#[test]
fn test_buffered_read() {
let data = vec![1, 2, 3, 4];
let bytes: i64 = -1;
let mut buffered = BufferedIO {
cursor: Cursor::new(data.clone()),
};
assert_eq!(buffered.read(bytes).unwrap(), data);
}
#[test]
fn test_buffered_seek() {
let data = vec![1, 2, 3, 4];
let count: u64 = 2;
let mut buffered = BufferedIO {
cursor: Cursor::new(data.clone()),
};
assert_eq!(buffered.seek(count.clone()).unwrap(), count);
assert_eq!(buffered.read(count.clone() as i64).unwrap(), vec![3, 4]);
}
#[test]
fn test_buffered_value() {
let data = vec![1, 2, 3, 4];
let buffered = BufferedIO {
cursor: Cursor::new(data.clone()),
};
assert_eq!(buffered.getvalue(), data);
}
}