1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
use serde::{Serialize, Deserialize};
use std::io::{Write, Seek, SeekFrom};

#[cfg(unix)]
use std::os::unix::fs::FileExt;
#[cfg(windows)]
use std::os::windows::fs::FileExt;

type Result<T> = std::result::Result<T, std::io::Error>;

pub struct IncrementalJsonWriter<T: FileExt + Write + Seek> { 
    buffer: T,
}
impl<T: FileExt + Write + Seek> IncrementalJsonWriter<T> {
    pub fn new(buffer: T) -> Self { 
        IncrementalJsonWriter::<T> {
            buffer
        }
    }

    pub fn write_json<U: Serialize>(&mut self, element: &U) -> Result<usize> {
        self.write(serde_json::to_string_pretty(&element)?.as_bytes())
    }
    
    #[cfg(unix)]
    fn write_at_offset(&mut self, bytes: &[u8], offset: u64) -> Result<usize> {
        let bytes_written = self.buffer.write_at(bytes, offset)?;
        self.buffer.seek(SeekFrom::Current((bytes_written - 2) as i64)).map(|_| bytes_written)
    }
    #[cfg(windows)]
    fn write_at_offset(&mut self, bytes: &[u8], offset: u64) -> Result<usize> {
        self.buffer.seek_write(bytes, offset)
    }
}

impl<T: FileExt + Write + Seek> Write for IncrementalJsonWriter<T> { 
    fn write(&mut self, element: &[u8]) -> Result<usize> { 
        let mut current = self.buffer.seek(SeekFrom::Current(0))?;
        let mut bytes = vec![];

        if current == 0 { 
            self.buffer.write(b"[\n\n]")?;
            current = self.buffer.seek(SeekFrom::Current(0))?;
        } else { 
            bytes.extend(b",\n");
        }

        bytes.extend(element);
        bytes.push(b'\n');
        bytes.push(b']');

        let written = self.write_at_offset(&bytes, current - 2)?;

        let new_position = self.buffer.seek(SeekFrom::Current(0))?;
        println!("Was: {}, now: {}, total: {}", current, new_position, current + bytes.len() as u64);
        Ok(written)
    }
    fn flush(&mut self) -> Result<()> { 
        self.buffer.flush()
    }
}

#[test]
fn writer_writes_square_brackets_to_buffer() {
    use std::io::{Read};

    let expect_one = String::from("[\n{\n  \"name\": \"Test\",\n  \"detail\": 0\n},\
    \n{\n  \"name\": \"Test\",\n  \"detail\": 1\n}\n]");

    let expect_two = String::from("[\n{\n  \"name\": \"Test\",\n  \"detail\": 0\n},\
    \n{\n  \"name\": \"Test\",\n  \"detail\": 1\n},\
    \n{\n  \"name\": \"Test\",\n  \"detail\": 2\n},\
    \n{\n  \"name\": \"Test\",\n  \"detail\": 3\n}\n]");

    let path = "unittest.json";
    let rows: Vec<Record> = vec![0, 1, 2, 3]
        .iter()
        .map(|num| Record { name: String::from("Test"), detail: *num})
        .collect();
    
    let writer = std::fs::File::create(path).unwrap();
    let mut json_writer = IncrementalJsonWriter::new(writer);

    let mut reader = std::fs::File::open(path).unwrap();
    
    for row in rows.iter().take(2) { 
        json_writer.write_json(&row).unwrap();
    }
    
    let mut buffer = String::new();
    reader.read_to_string(&mut buffer).unwrap();
    assert_eq!(expect_one, buffer);

    for row in rows.iter().skip(2).take(2) { 
        json_writer.write_json(&row).unwrap();
    }
    let mut buffer = String::new();
    reader.seek(SeekFrom::Start(0)).unwrap();
    reader.read_to_string(&mut buffer).unwrap();
    assert_eq!(expect_two, buffer);
}

#[derive(Serialize, Deserialize, Debug)]
struct Record { 
    name: String,
    detail: u32
}