use crate::{
buffer::{self, Change, Text},
epoch::{Cursor, DirEntry, Epoch, EpochId, FileId, FileType, FsOperation, ROOT_FILE_ID},
error::Error,
id::{Oid, ReplicaId},
point::Point,
time::{Global, Lamport},
};
use futures::{
future,
stream::{self, StreamExt},
};
use std::{
cell::{Ref, RefCell, RefMut},
cmp::Ordering,
collections::HashMap,
io,
ops::Range,
path::{Path, PathBuf},
rc::Rc,
vec,
};
pub trait GitProvider {
fn base_entries(&self, oid: Oid) -> stream::BoxStream<'static, io::Result<DirEntry>>;
fn base_text(&self, oid: Oid, path: &Path) -> future::BoxFuture<'static, io::Result<String>>;
}
#[derive(Clone)]
pub struct WorkTree {
epoch: Rc<RefCell<Option<Epoch>>>,
buffers: Rc<RefCell<HashMap<BufferId, FileId>>>,
next_buffer_id: Rc<RefCell<BufferId>>,
deferred_ops: Rc<RefCell<HashMap<EpochId, Vec<FsOperation>>>>,
lamport_clock: Rc<RefCell<Lamport>>,
git: Rc<dyn GitProvider>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Operation {
StartEpoch {
epoch_id: EpochId,
head: Oid,
},
EpochOperation {
epoch_id: EpochId,
operation: FsOperation,
},
}
impl Operation {
fn stamp<T>(epoch_id: EpochId, operations: T) -> impl Iterator<Item = Operation>
where
T: IntoIterator<Item = FsOperation>,
{
operations
.into_iter()
.map(move |operation| Operation::EpochOperation {
epoch_id,
operation,
})
}
}
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
pub struct BufferId(u32);
impl WorkTree {
pub fn new<G: 'static + GitProvider>(replica_id: ReplicaId, git: Rc<G>) -> WorkTree {
WorkTree {
epoch: Rc::new(RefCell::new(None)),
buffers: Rc::new(RefCell::new(HashMap::new())),
next_buffer_id: Rc::new(RefCell::new(BufferId(0))),
deferred_ops: Rc::new(RefCell::new(HashMap::new())),
lamport_clock: Rc::new(RefCell::new(Lamport::new(replica_id))),
git,
}
}
async fn init<I>(&self, base: Oid, ops: I) -> Result<Vec<Operation>, Error>
where
I: IntoIterator<Item = Operation> + 'static,
{
let mut ops = ops.into_iter().peekable();
if ops.peek().is_none() {
self.reset(base).await
} else {
self.apply_ops(ops).await
}
}
pub async fn reset(&self, head: Oid) -> Result<Vec<Operation>, Error> {
let epoch_id = self.lamport_clock.borrow_mut().tick();
let mut ops = vec![Operation::StartEpoch { epoch_id, head }];
ops.extend(self.start_epoch(epoch_id, head).await?);
Ok(ops)
}
pub async fn apply_ops<I>(&self, ops: I) -> Result<Vec<Operation>, Error>
where
I: IntoIterator<Item = Operation> + 'static,
{
let mut cur_epoch_ops = Vec::new();
let mut epoch_futures = Vec::new();
for op in ops {
match op {
Operation::StartEpoch { epoch_id, head } => {
epoch_futures.push(self.start_epoch(epoch_id, head));
}
Operation::EpochOperation {
epoch_id,
operation,
} => {
if let Some(epoch) = self.epoch.borrow().as_ref() {
match epoch_id.cmp(&epoch.epoch_id) {
Ordering::Less => {}
Ordering::Equal => cur_epoch_ops.push(operation),
Ordering::Greater => self.defer_epoch_op(epoch_id, operation),
}
} else {
self.defer_epoch_op(epoch_id, operation);
}
}
}
}
let mut fixup_ops = Vec::new();
if let Some(epoch) = self.epoch.borrow_mut().as_mut() {
fixup_ops.extend(Operation::stamp(
epoch.epoch_id,
epoch.apply_ops(cur_epoch_ops, &mut self.lamport_clock.borrow_mut())?,
));
}
for epoch_future in epoch_futures {
fixup_ops.extend(epoch_future.await?);
}
Ok(fixup_ops)
}
async fn start_epoch(
&self,
new_epoch_id: EpochId,
new_head: Oid,
) -> Result<Vec<Operation>, Error> {
let mut epoch_to_start = Some(Epoch::new(self.replica_id(), new_epoch_id, new_head));
{
let mut borrow_mut = self.epoch.borrow_mut();
if borrow_mut.is_none() {
*borrow_mut = epoch_to_start.take();
}
}
let mut fixup_ops = Vec::new();
if new_epoch_id >= self.epoch_id() {
let mut pending_base_entries = self.git.base_entries(new_head).chunks(500);
loop {
let (base_entries, next_pending_base_entries) =
pending_base_entries.into_future().await;
pending_base_entries = next_pending_base_entries;
if let Some(base_entries) = base_entries {
let mut unwrapped_entries = Vec::with_capacity(base_entries.len());
for base_entry in base_entries {
match base_entry {
Ok(base_entry) => unwrapped_entries.push(base_entry),
Err(error) => return Err(error.into()),
}
}
if let Some(epoch_to_start) = epoch_to_start.as_mut() {
epoch_to_start.append_base_entries(
unwrapped_entries,
&mut self.lamport_clock.borrow_mut(),
)?;
} else {
let mut epoch = self.cur_epoch_mut();
if new_epoch_id == epoch.epoch_id {
let epoch_fixup_ops = epoch.append_base_entries(
unwrapped_entries,
&mut self.lamport_clock.borrow_mut(),
)?;
fixup_ops.extend(Operation::stamp(new_epoch_id, epoch_fixup_ops));
}
}
} else {
break;
}
}
let mut cur_epoch = self.cur_epoch_mut();
if new_epoch_id > cur_epoch.epoch_id {
*cur_epoch = epoch_to_start.unwrap();
if let Some(ops) = self.deferred_ops.borrow_mut().remove(&new_epoch_id) {
let epoch_fixup_ops =
cur_epoch.apply_ops(ops, &mut self.lamport_clock.borrow_mut())?;
fixup_ops.extend(Operation::stamp(new_epoch_id, epoch_fixup_ops));
}
self.deferred_ops
.borrow_mut()
.retain(|id, _| *id > new_epoch_id);
}
}
Ok(fixup_ops)
}
pub fn version(&self) -> Global {
self.cur_epoch().version()
}
pub fn with_cursor<F>(&self, mut f: F)
where
F: FnMut(&mut Cursor<'_>),
{
if let Some(mut cursor) = self.cur_epoch().cursor() {
f(&mut cursor);
}
}
pub fn new_text_file(&self) -> (FileId, Operation) {
let mut cur_epoch = self.cur_epoch_mut();
let epoch_id = cur_epoch.epoch_id;
let (file_id, operation) = cur_epoch.new_text_file(&mut self.lamport_clock.borrow_mut());
(
file_id,
Operation::EpochOperation {
epoch_id,
operation,
},
)
}
pub fn create_dir<P>(&self, path: P) -> Result<(FileId, Operation), Error>
where
P: AsRef<Path>,
{
let path = path.as_ref();
let name = path
.file_name()
.ok_or(Error::InvalidPath("path has no file name".into()))?;
let mut cur_epoch = self.cur_epoch_mut();
let parent_id = if let Some(parent_path) = path.parent() {
cur_epoch.file_id(parent_path)?
} else {
ROOT_FILE_ID
};
let epoch_id = cur_epoch.epoch_id;
let (file_id, operation) =
cur_epoch.create_dir(parent_id, name, &mut self.lamport_clock.borrow_mut())?;
Ok((
file_id,
Operation::EpochOperation {
epoch_id,
operation,
},
))
}
pub fn create_file<P>(&self, path: P, file_type: FileType) -> Result<Operation, Error>
where
P: AsRef<Path>,
{
let path = path.as_ref();
let name = path
.file_name()
.ok_or(Error::InvalidPath("path has no file name".into()))?;
let mut cur_epoch = self.cur_epoch_mut();
let parent_id = if let Some(parent_path) = path.parent() {
cur_epoch.file_id(parent_path)?
} else {
ROOT_FILE_ID
};
let epoch_id = cur_epoch.epoch_id;
let operation = cur_epoch.create_file(
parent_id,
name,
file_type,
&mut self.lamport_clock.borrow_mut(),
)?;
Ok(Operation::EpochOperation {
epoch_id,
operation,
})
}
pub async fn open_text_file<P>(&self, path: P) -> Result<BufferId, Error>
where
P: Into<PathBuf>,
{
let path = path.into();
loop {
if let Some(buffer_id) = self.existing_buffer(&path) {
return Ok(buffer_id);
} else {
let epoch_id;
let epoch_head;
let file_id;
let base_path;
{
let cur_epoch = self.cur_epoch_mut();
epoch_id = cur_epoch.epoch_id;
epoch_head = cur_epoch.head;
file_id = cur_epoch.file_id(&path)?;
base_path = cur_epoch.base_path(file_id);
}
let base_text = if let Some(base_path) = base_path {
self.git.base_text(epoch_head, &base_path).await?
} else {
String::new()
};
if let Some(buffer_id) = self.existing_buffer(&path) {
return Ok(buffer_id);
} else if epoch_id == self.cur_epoch().epoch_id {
let mut cur_epoch = self.cur_epoch_mut();
cur_epoch.open_text_file(
file_id,
base_text.as_str(),
&mut self.lamport_clock.borrow_mut(),
)?;
let buffer_id = *self.next_buffer_id.borrow_mut();
self.next_buffer_id.borrow_mut().0 += 1;
self.buffers.borrow_mut().insert(buffer_id, file_id);
return Ok(buffer_id);
}
}
}
}
fn existing_buffer(&self, path: &Path) -> Option<BufferId> {
let cur_epoch = self.cur_epoch();
for (buffer_id, file_id) in self.buffers.borrow().iter() {
if let Some(existing_path) = cur_epoch.path(*file_id) {
if path == existing_path {
return Some(*buffer_id);
}
}
}
None
}
pub fn rename<P1, P2>(&self, old_path: P1, new_path: P2) -> Result<Operation, Error>
where
P1: AsRef<Path>,
P2: AsRef<Path>,
{
let old_path = old_path.as_ref();
let new_path = new_path.as_ref();
let mut cur_epoch = self.cur_epoch_mut();
let file_id = cur_epoch.file_id(old_path)?;
let new_name = new_path
.file_name()
.ok_or(Error::InvalidPath("new path has no file name".into()))?;
let new_parent_id = if let Some(parent_path) = new_path.parent() {
cur_epoch.file_id(parent_path)?
} else {
ROOT_FILE_ID
};
let epoch_id = cur_epoch.epoch_id;
let operation = cur_epoch.rename(
file_id,
new_parent_id,
new_name,
&mut self.lamport_clock.borrow_mut(),
)?;
Ok(Operation::EpochOperation {
epoch_id,
operation,
})
}
pub fn remove<P>(&self, path: P) -> Result<Operation, Error>
where
P: AsRef<Path>,
{
let mut cur_epoch = self.cur_epoch_mut();
let file_id = cur_epoch.file_id(path.as_ref())?;
let epoch_id = cur_epoch.epoch_id;
let operation = cur_epoch.remove(file_id, &mut self.lamport_clock.borrow_mut())?;
Ok(Operation::EpochOperation {
epoch_id,
operation,
})
}
pub fn edit<I, T>(
&self,
buffer_id: BufferId,
old_ranges: I,
new_text: T,
) -> Result<Operation, Error>
where
I: IntoIterator<Item = Range<usize>>,
T: Into<Text>,
{
let file_id = self.buffer_file_id(buffer_id)?;
let mut cur_epoch = self.cur_epoch_mut();
let epoch_id = cur_epoch.epoch_id;
let operation = cur_epoch
.edit(
file_id,
old_ranges,
new_text,
&mut self.lamport_clock.borrow_mut(),
)
.unwrap();
Ok(Operation::EpochOperation {
epoch_id,
operation,
})
}
pub fn edit_2d<I, T>(
&self,
buffer_id: BufferId,
old_ranges: I,
new_text: T,
) -> Result<Operation, Error>
where
I: IntoIterator<Item = Range<Point>>,
T: Into<Text>,
{
let file_id = self.buffer_file_id(buffer_id)?;
let mut cur_epoch = self.cur_epoch_mut();
let epoch_id = cur_epoch.epoch_id;
let operation = cur_epoch
.edit_2d(
file_id,
old_ranges,
new_text,
&mut self.lamport_clock.borrow_mut(),
)
.unwrap();
Ok(Operation::EpochOperation {
epoch_id,
operation,
})
}
pub fn path(&self, buffer_id: BufferId) -> Option<PathBuf> {
self.buffers
.borrow()
.get(&buffer_id)
.and_then(|file_id| self.cur_epoch().path(*file_id))
}
pub fn text(&self, buffer_id: BufferId) -> Result<buffer::Iter, Error> {
let file_id = self.buffer_file_id(buffer_id)?;
self.cur_epoch().text(file_id)
}
pub fn changes_since(
&self,
buffer_id: BufferId,
version: Global,
) -> Result<impl Iterator<Item = Change>, Error> {
let file_id = self.buffer_file_id(buffer_id)?;
self.cur_epoch().changes_since(file_id, version)
}
fn defer_epoch_op(&self, epoch_id: EpochId, operation: FsOperation) {
self.deferred_ops
.borrow_mut()
.entry(epoch_id)
.or_insert(Vec::new())
.push(operation);
}
fn replica_id(&self) -> ReplicaId {
self.lamport_clock.borrow().replica_id
}
fn buffer_file_id(&self, buffer_id: BufferId) -> Result<FileId, Error> {
self.buffers
.borrow()
.get(&buffer_id)
.cloned()
.ok_or(Error::InvalidBufferId)
}
pub fn exists<P>(&self, path: P) -> bool
where
P: AsRef<Path>,
{
self.cur_epoch().file_id(path).is_ok()
}
pub fn head(&self) -> Oid {
self.cur_epoch().head
}
pub fn epoch_id(&self) -> EpochId {
self.cur_epoch().epoch_id
}
fn cur_epoch(&self) -> Ref<'_, Epoch> {
Ref::map(self.epoch.borrow(), |a| a.as_ref().unwrap())
}
fn cur_epoch_mut(&self) -> RefMut<'_, Epoch> {
RefMut::map(self.epoch.borrow_mut(), |a| a.as_mut().unwrap())
}
}
#[cfg(test)]
mod test;