use crate::traits::BlockStore;
use bytes::Bytes;
use ipfrs_core::{Block, Cid, Error, Result};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
fn serialize_cid<S>(cid: &Cid, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_bytes(&cid.to_bytes())
}
fn deserialize_cid<'de, D>(deserializer: D) -> std::result::Result<Cid, D::Error>
where
D: Deserializer<'de>,
{
let bytes: Vec<u8> = Deserialize::deserialize(deserializer)?;
Cid::try_from(bytes).map_err(serde::de::Error::custom)
}
fn serialize_cid_vec<S>(cids: &[Cid], serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(cids.len()))?;
for cid in cids {
seq.serialize_element(&cid.to_bytes())?;
}
seq.end()
}
fn deserialize_cid_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<Cid>, D::Error>
where
D: Deserializer<'de>,
{
let bytes_vec: Vec<Vec<u8>> = Deserialize::deserialize(deserializer)?;
bytes_vec
.into_iter()
.map(|bytes| Cid::try_from(bytes).map_err(serde::de::Error::custom))
.collect()
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Commit {
#[serde(skip)]
pub cid: Option<Cid>,
#[serde(
serialize_with = "serialize_cid_vec",
deserialize_with = "deserialize_cid_vec"
)]
pub parents: Vec<Cid>,
#[serde(serialize_with = "serialize_cid", deserialize_with = "deserialize_cid")]
pub root: Cid,
pub message: String,
pub author: Author,
pub timestamp: u64,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Author {
pub name: String,
pub email: String,
}
impl Commit {
pub fn new(
parents: Vec<Cid>,
root: Cid,
message: String,
author: Author,
metadata: HashMap<String, String>,
) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
Self {
cid: None,
parents,
root,
message,
author,
timestamp,
metadata,
}
}
pub fn finalize(&mut self) -> Result<Cid> {
let bytes = oxicode::serde::encode_to_vec(self, oxicode::config::standard())
.map_err(|e| Error::Serialization(format!("Failed to serialize commit: {e}")))?;
let block = Block::new(Bytes::from(bytes))?;
let cid = *block.cid();
self.cid = Some(cid);
Ok(cid)
}
pub fn from_block(block: &Block) -> Result<Self> {
let mut commit: Commit =
oxicode::serde::decode_owned_from_slice(block.data(), oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| Error::Serialization(format!("Failed to deserialize commit: {e}")))?;
commit.cid = Some(*block.cid());
Ok(commit)
}
pub fn to_block(&self) -> Result<Block> {
let bytes = oxicode::serde::encode_to_vec(self, oxicode::config::standard())
.map_err(|e| Error::Serialization(format!("Failed to serialize commit: {e}")))?;
Block::new(Bytes::from(bytes))
}
pub fn is_initial(&self) -> bool {
self.parents.is_empty()
}
pub fn cid(&self) -> &Cid {
self.cid.as_ref().expect("Commit not finalized")
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Ref {
pub name: String,
#[serde(serialize_with = "serialize_cid", deserialize_with = "deserialize_cid")]
pub commit: Cid,
pub ref_type: RefType,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum RefType {
Branch,
Tag,
}
impl Ref {
pub fn branch(name: String, commit: Cid) -> Self {
Self {
name,
commit,
ref_type: RefType::Branch,
}
}
pub fn tag(name: String, commit: Cid) -> Self {
Self {
name,
commit,
ref_type: RefType::Tag,
}
}
pub fn to_block(&self) -> Result<Block> {
let bytes = oxicode::serde::encode_to_vec(self, oxicode::config::standard())
.map_err(|e| Error::Serialization(format!("Failed to serialize ref: {e}")))?;
Block::new(Bytes::from(bytes))
}
pub fn from_block(block: &Block) -> Result<Self> {
oxicode::serde::decode_owned_from_slice(block.data(), oxicode::config::standard())
.map(|(v, _)| v)
.map_err(|e| Error::Serialization(format!("Failed to deserialize ref: {e}")))
}
}
pub struct VersionControl<S: BlockStore> {
store: Arc<S>,
current_branch: parking_lot::RwLock<String>,
head: parking_lot::RwLock<Option<Cid>>,
refs_cache: dashmap::DashMap<String, Cid>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MergeStrategy {
FastForward,
ThreeWay,
Ours,
Theirs,
}
#[derive(Debug, Clone, PartialEq)]
pub enum MergeResult {
FastForward { target: Cid },
MergeCommit { commit: Cid },
Conflicts { conflicts: Vec<String> },
}
impl<S: BlockStore> VersionControl<S> {
pub fn new(store: Arc<S>) -> Self {
Self {
store,
current_branch: parking_lot::RwLock::new("main".to_string()),
head: parking_lot::RwLock::new(None),
refs_cache: dashmap::DashMap::new(),
}
}
pub fn list_refs(&self) -> Vec<(String, Cid)> {
self.refs_cache
.iter()
.map(|entry| (entry.key().clone(), *entry.value()))
.collect()
}
pub async fn commit(
&self,
root: Cid,
message: String,
author: Author,
metadata: HashMap<String, String>,
) -> Result<Cid> {
let parents = if let Some(head) = *self.head.read() {
vec![head]
} else {
vec![] };
let mut commit = Commit::new(parents, root, message, author, metadata);
let commit_cid = commit.finalize()?;
let commit_block = commit.to_block()?;
self.store.put(&commit_block).await?;
*self.head.write() = Some(commit_cid);
let branch_name = self.current_branch.read().clone();
self.update_ref(&branch_name, commit_cid, RefType::Branch)
.await?;
Ok(commit_cid)
}
pub async fn checkout(&self, commit_cid: &Cid) -> Result<Cid> {
let commit_block = self
.store
.get(commit_cid)
.await?
.ok_or_else(|| Error::NotFound(format!("Commit not found: {commit_cid}")))?;
let commit = Commit::from_block(&commit_block)?;
*self.head.write() = Some(*commit_cid);
Ok(commit.root)
}
pub async fn checkout_ref(&self, ref_name: &str) -> Result<Cid> {
let ref_obj = self.get_ref(ref_name).await?;
if ref_obj.ref_type == RefType::Branch {
*self.current_branch.write() = ref_name.to_string();
}
self.checkout(&ref_obj.commit).await
}
pub async fn create_branch(&self, branch_name: &str) -> Result<()> {
let head = self
.head
.read()
.ok_or_else(|| Error::Storage("No HEAD commit".to_string()))?;
self.update_ref(branch_name, head, RefType::Branch).await
}
pub async fn create_tag(&self, tag_name: &str) -> Result<()> {
let head = self
.head
.read()
.ok_or_else(|| Error::Storage("No HEAD commit".to_string()))?;
self.update_ref(tag_name, head, RefType::Tag).await
}
async fn update_ref(&self, name: &str, commit: Cid, ref_type: RefType) -> Result<()> {
self.refs_cache.insert(name.to_string(), commit);
let ref_obj = Ref {
name: name.to_string(),
commit,
ref_type,
};
let ref_block = ref_obj.to_block()?;
self.store.put(&ref_block).await?;
Ok(())
}
#[allow(clippy::unused_async)]
async fn get_ref(&self, name: &str) -> Result<Ref> {
if let Some(commit_cid) = self.refs_cache.get(name) {
let ref_type = if name.starts_with("refs/tags/") || name.contains("/tags/") {
RefType::Tag
} else {
RefType::Branch
};
return Ok(Ref {
name: name.to_string(),
commit: *commit_cid,
ref_type,
});
}
Err(Error::NotFound(format!("Ref not found: {name}")))
}
pub fn head(&self) -> Option<Cid> {
*self.head.read()
}
pub fn current_branch(&self) -> String {
self.current_branch.read().clone()
}
pub async fn log(&self, commit_cid: &Cid, limit: usize) -> Result<Vec<Commit>> {
let mut commits = Vec::new();
let mut current = Some(*commit_cid);
while let Some(cid) = current {
if commits.len() >= limit {
break;
}
let commit_block = self
.store
.get(&cid)
.await?
.ok_or_else(|| Error::NotFound(format!("Commit not found: {cid}")))?;
let commit = Commit::from_block(&commit_block)?;
current = commit.parents.first().copied();
commits.push(commit);
}
Ok(commits)
}
pub fn store(&self) -> &Arc<S> {
&self.store
}
pub async fn find_common_ancestor(&self, commit1: &Cid, commit2: &Cid) -> Result<Option<Cid>> {
let mut ancestors1 = std::collections::HashSet::new();
let mut queue = vec![*commit1];
while let Some(cid) = queue.pop() {
if !ancestors1.insert(cid) {
continue; }
let block = self
.store
.get(&cid)
.await?
.ok_or_else(|| Error::NotFound(format!("Commit not found: {cid}")))?;
let commit = Commit::from_block(&block)?;
queue.extend(commit.parents.iter().copied());
}
let mut queue = vec![*commit2];
let mut visited = std::collections::HashSet::new();
while let Some(cid) = queue.pop() {
if !visited.insert(cid) {
continue;
}
if ancestors1.contains(&cid) {
return Ok(Some(cid));
}
let block = self
.store
.get(&cid)
.await?
.ok_or_else(|| Error::NotFound(format!("Commit not found: {cid}")))?;
let commit = Commit::from_block(&block)?;
queue.extend(commit.parents.iter().copied());
}
Ok(None)
}
pub async fn is_ancestor(&self, ancestor: &Cid, descendant: &Cid) -> Result<bool> {
if ancestor == descendant {
return Ok(true);
}
let mut queue = vec![*descendant];
let mut visited = std::collections::HashSet::new();
while let Some(cid) = queue.pop() {
if !visited.insert(cid) {
continue;
}
if &cid == ancestor {
return Ok(true);
}
let block = self
.store
.get(&cid)
.await?
.ok_or_else(|| Error::NotFound(format!("Commit not found: {cid}")))?;
let commit = Commit::from_block(&block)?;
queue.extend(commit.parents.iter().copied());
}
Ok(false)
}
pub async fn merge(
&self,
branch_cid: &Cid,
message: String,
author: Author,
strategy: MergeStrategy,
) -> Result<MergeResult> {
let head_cid = self
.head
.read()
.ok_or_else(|| Error::Storage("No HEAD commit".to_string()))?;
if &head_cid == branch_cid {
return Ok(MergeResult::FastForward { target: head_cid });
}
if self.is_ancestor(&head_cid, branch_cid).await? {
*self.head.write() = Some(*branch_cid);
let branch_name = self.current_branch.read().clone();
self.refs_cache.insert(branch_name.clone(), *branch_cid);
return Ok(MergeResult::FastForward {
target: *branch_cid,
});
}
if strategy == MergeStrategy::FastForward {
return Err(Error::Storage(
"Fast-forward not possible, branches have diverged".to_string(),
));
}
let head_block = self
.store
.get(&head_cid)
.await?
.ok_or_else(|| Error::NotFound(format!("HEAD commit not found: {head_cid}")))?;
let head_commit = Commit::from_block(&head_block)?;
let branch_block = self
.store
.get(branch_cid)
.await?
.ok_or_else(|| Error::NotFound(format!("Branch commit not found: {branch_cid}")))?;
let branch_commit = Commit::from_block(&branch_block)?;
match strategy {
MergeStrategy::ThreeWay | MergeStrategy::Ours | MergeStrategy::Theirs => {
let merge_root = match strategy {
MergeStrategy::Ours => head_commit.root,
MergeStrategy::Theirs => branch_commit.root,
MergeStrategy::ThreeWay => {
branch_commit.root
}
_ => unreachable!(),
};
let mut merge_commit = Commit::new(
vec![head_cid, *branch_cid],
merge_root,
message,
author,
HashMap::new(),
);
let merge_cid = merge_commit.finalize()?;
let merge_block = merge_commit.to_block()?;
self.store.put(&merge_block).await?;
*self.head.write() = Some(merge_cid);
let branch_name = self.current_branch.read().clone();
self.refs_cache.insert(branch_name.clone(), merge_cid);
Ok(MergeResult::MergeCommit { commit: merge_cid })
}
MergeStrategy::FastForward => unreachable!(), }
}
pub async fn merge_branch(
&self,
branch_name: &str,
message: String,
author: Author,
strategy: MergeStrategy,
) -> Result<MergeResult> {
let branch_ref = self.get_ref(branch_name).await?;
self.merge(&branch_ref.commit, message, author, strategy)
.await
}
}
pub struct CommitBuilder {
parents: Vec<Cid>,
root: Option<Cid>,
message: Option<String>,
author: Option<Author>,
metadata: HashMap<String, String>,
}
impl CommitBuilder {
pub fn new() -> Self {
Self {
parents: Vec::new(),
root: None,
message: None,
author: None,
metadata: HashMap::new(),
}
}
#[must_use]
pub fn parents(mut self, parents: Vec<Cid>) -> Self {
self.parents = parents;
self
}
#[must_use]
pub fn root(mut self, root: Cid) -> Self {
self.root = Some(root);
self
}
#[must_use]
pub fn message(mut self, message: String) -> Self {
self.message = Some(message);
self
}
#[must_use]
pub fn author(mut self, author: Author) -> Self {
self.author = Some(author);
self
}
#[must_use]
pub fn metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
pub fn build(self) -> Result<Commit> {
let root = self
.root
.ok_or_else(|| Error::Storage("Root CID is required".to_string()))?;
let message = self
.message
.ok_or_else(|| Error::Storage("Commit message is required".to_string()))?;
let author = self
.author
.ok_or_else(|| Error::Storage("Author is required".to_string()))?;
Ok(Commit::new(
self.parents,
root,
message,
author,
self.metadata,
))
}
}
impl Default for CommitBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blockstore::{BlockStoreConfig, SledBlockStore};
use std::path::PathBuf;
#[tokio::test]
async fn test_commit_creation() {
let author = Author {
name: "Test User".to_string(),
email: "test@example.com".to_string(),
};
let root_block = Block::new(Bytes::from("model weights")).unwrap();
let root_cid = *root_block.cid();
let mut commit = Commit::new(
vec![],
root_cid,
"Initial commit".to_string(),
author,
HashMap::new(),
);
let commit_cid = commit.finalize().unwrap();
assert!(commit.cid.is_some());
assert_eq!(commit.cid(), &commit_cid);
assert!(commit.is_initial());
}
#[tokio::test]
async fn test_commit_serialization() {
let author = Author {
name: "Test User".to_string(),
email: "test@example.com".to_string(),
};
let root_block = Block::new(Bytes::from("model weights")).unwrap();
let root_cid = *root_block.cid();
let mut commit = Commit::new(
vec![],
root_cid,
"Initial commit".to_string(),
author.clone(),
HashMap::new(),
);
commit.finalize().unwrap();
let commit_block = commit.to_block().unwrap();
let deserialized = Commit::from_block(&commit_block).unwrap();
assert_eq!(commit, deserialized);
}
#[tokio::test]
async fn test_version_control_initial_commit() {
let config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-vcs-test-initial"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&config.path);
let store = Arc::new(SledBlockStore::new(config).unwrap());
let vcs = VersionControl::new(store.clone());
let model_block = Block::new(Bytes::from("model v1")).unwrap();
let model_cid = *model_block.cid();
store.put(&model_block).await.unwrap();
let author = Author {
name: "Test User".to_string(),
email: "test@example.com".to_string(),
};
let commit_cid = vcs
.commit(
model_cid,
"Initial commit".to_string(),
author,
HashMap::new(),
)
.await
.unwrap();
assert_eq!(vcs.head(), Some(commit_cid));
let commit_block = store.get(&commit_cid).await.unwrap().unwrap();
let commit = Commit::from_block(&commit_block).unwrap();
assert_eq!(commit.root, model_cid);
assert_eq!(commit.message, "Initial commit");
assert!(commit.is_initial());
}
#[tokio::test]
async fn test_version_control_multiple_commits() {
let config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-vcs-test-multiple"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&config.path);
let store = Arc::new(SledBlockStore::new(config).unwrap());
let vcs = VersionControl::new(store.clone());
let author = Author {
name: "Test User".to_string(),
email: "test@example.com".to_string(),
};
let model1 = Block::new(Bytes::from("model v1")).unwrap();
store.put(&model1).await.unwrap();
let commit1 = vcs
.commit(
*model1.cid(),
"First commit".to_string(),
author.clone(),
HashMap::new(),
)
.await
.unwrap();
let model2 = Block::new(Bytes::from("model v2")).unwrap();
store.put(&model2).await.unwrap();
let commit2 = vcs
.commit(
*model2.cid(),
"Second commit".to_string(),
author,
HashMap::new(),
)
.await
.unwrap();
assert_eq!(vcs.head(), Some(commit2));
let commit2_block = store.get(&commit2).await.unwrap().unwrap();
let commit2_obj = Commit::from_block(&commit2_block).unwrap();
assert_eq!(commit2_obj.parents, vec![commit1]);
}
#[tokio::test]
async fn test_checkout() {
let config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-vcs-test-checkout"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&config.path);
let store = Arc::new(SledBlockStore::new(config).unwrap());
let vcs = VersionControl::new(store.clone());
let author = Author {
name: "Test User".to_string(),
email: "test@example.com".to_string(),
};
let model1 = Block::new(Bytes::from("model v1")).unwrap();
store.put(&model1).await.unwrap();
let commit1 = vcs
.commit(
*model1.cid(),
"First".to_string(),
author.clone(),
HashMap::new(),
)
.await
.unwrap();
let model2 = Block::new(Bytes::from("model v2")).unwrap();
store.put(&model2).await.unwrap();
let _commit2 = vcs
.commit(*model2.cid(), "Second".to_string(), author, HashMap::new())
.await
.unwrap();
let root = vcs.checkout(&commit1).await.unwrap();
assert_eq!(root, *model1.cid());
assert_eq!(vcs.head(), Some(commit1));
}
#[tokio::test]
async fn test_commit_log() {
let config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-vcs-test-log"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&config.path);
let store = Arc::new(SledBlockStore::new(config).unwrap());
let vcs = VersionControl::new(store.clone());
let author = Author {
name: "Test User".to_string(),
email: "test@example.com".to_string(),
};
let mut commits = Vec::new();
for i in 1..=3 {
let model = Block::new(Bytes::from(format!("model v{}", i))).unwrap();
store.put(&model).await.unwrap();
let commit = vcs
.commit(
*model.cid(),
format!("Commit {}", i),
author.clone(),
HashMap::new(),
)
.await
.unwrap();
commits.push(commit);
}
let log = vcs.log(&commits[2], 10).await.unwrap();
assert_eq!(log.len(), 3);
assert_eq!(log[0].message, "Commit 3");
assert_eq!(log[1].message, "Commit 2");
assert_eq!(log[2].message, "Commit 1");
}
#[test]
fn test_commit_builder() {
let author = Author {
name: "Builder".to_string(),
email: "builder@example.com".to_string(),
};
let root_block = Block::new(Bytes::from("root")).unwrap();
let commit = CommitBuilder::new()
.root(*root_block.cid())
.message("Test commit".to_string())
.author(author.clone())
.metadata("key1".to_string(), "value1".to_string())
.build()
.unwrap();
assert_eq!(commit.message, "Test commit");
assert_eq!(commit.author, author);
assert_eq!(commit.metadata.get("key1").unwrap(), "value1");
}
}