Crate samod

Crate samod 

Source
Expand description

§Samod

samod is a library for building collaborative applications which work offlne and don’t require servers (though servers certainly can be useful). This is achieved by representing data as automerge documents. samod is wire compatible with the automerge-repo JavaScript library.

§What does all that mean?

samod helps you manage automerge “documents”, which are hierarchical data structures composed of maps, lists, text, and primitive values - a little like JSON. Every change you make to a document is recorded and you can move back and forth through the history of a document - it’s a bit like Git for JSON. samod takes care of storing changes for you and synchronizing them with connected peers. The interesting part is that given this very detailed history which we never discard, we can merge documents with changes which were made concurrently. This means that we can build applications which allow multiple users to edit the same document without having to have all changes go through a server.

§How it all works

The library is structured around a Repo, which talks to a Storage instance and to which you can connect to other peers using Repo::connect. Once you have a Repo you can create documents using Repo::create, or look up existing docuements using Repo::find. In either case you will get back a DocHandle which you can use to interact with the document.

Typically then, your workflow will look like this:

  • Initialize a Repo at application startup, passing it a RuntimeHandle implementation and Storage implementation
  • Whenever you have connections available (maybe you are connecting to a sync server, maybe you are receiving peer-to-peer connections) you call Repo::connect to drive the connection state.
  • Create DocHandles using Repo::create and look up existing documents using Repo::find
  • Modify documents using DocHandle::with_document

Let’s walk through each of those steps.

§Initializing a Repo

To initialize a Repo you call Repo::builder() to obtain a RepoBuilder which you use to configure the repo before calling RepoBuilder::load() to actually load the repository. For example:

let repo = samod::Repo::builder(tokio::runtime::Handle::current())
    .with_storage(samod::storage::InMemoryStorage::new())
    .load()
    .await;
})

The first argument to builder is an implementation of RuntimeHandle. Default implementations are provided for tokio and gio which can be conveniently used via Repo::build_tokio and Repo::build_gio respectively. The RuntimeHandle trait is straightforward to implement if you want to use some other async runtime.

By default samod uses an in-memory storage implementation. This is great for prototyping but in most cases you do actually want to persist data somewhere. In this case you’ll need an implementation of Storage to pass to RepoBuilder::with_storage

It is possible to use Storage and AnnouncePolicy implementations which do not produce Send futures. In this case you will also need a runtime which can spawn non-Send futures. See the runtimes section for more details.

§Connecting to peers

Once you have a Repo you can connect it to peers using Repo::connect. This method returns a future which must be driven to completion to run the connection. Here’s an example where we use futures::channel::mpsc as the transport. We create two repos and connect them with these channels.

use samod::ConnDirection;
use futures::{StreamExt, channel::mpsc};
use std::convert::Infallible;

tokio_test::block_on(async {
let alice = samod::Repo::build_tokio().load().await;
let bob = samod::Repo::build_tokio().load().await;

// Set up bidirectional channels
let (tx_to_bob, rx_from_alice) = mpsc::unbounded();
let (tx_to_alice, rx_from_bob) = mpsc::unbounded();
// This is just to make the types line up, ignore it
let rx_from_alice = rx_from_alice.map(Ok::<_, Infallible>);
let rx_from_bob = rx_from_bob.map(Ok::<_, Infallible>);

// Run the connection futures
tokio::spawn(alice.connect(rx_from_bob, tx_to_bob, ConnDirection::Outgoing));
tokio::spawn(bob.connect(rx_from_alice, tx_to_alice, ConnDirection::Incoming));
});

If you are using tokio and connecting to something like a TCP socket you can use Repo::connect_tokio_io, which reduces some boilerplate:

use samod::ConnDirection;

let repo: samod::Repo = todo!();
let io = tokio::net::TcpStream::connect("sync.automerge.org").await.unwrap();
tokio::spawn(repo.connect_tokio_io(io, ConnDirection::Outgoing));

If you are connecting to JavaScript sync server using WebSockets you can enable the tungstenite feature and use Repo::connect_websocket. If you are accepting websocket connections in an axum server you can use Repo::accept_axum.

§Managing Documents

Once you have a Repo you can use it to manage DocHandles. A DocHandle represents an automerge document which the Repo is managing. “managing” here means a few things:

  • Any changes made to the document using DocHandle::with_document will be persisted to storage and synchronized with connected peers (subject to the AnnouncePolicy).
  • Any changes received from connected peers will be applied to the document and made visible to the application. You can listen for these changes using DocHandle::changes.

To create a new document you use Repo::create which will return once the document has been persisted to storage. To look up an existing document you use Repo::find. This will first look in storage, then if the document is not found in storage it will request the document from all connected peers (again subject to the AnnouncePolicy). If any peer has the document the future returned by Repo::find will resolve once we have synchronized with at least one remote peer which has the document.

You can make changes to a document using DocHandle::with_document.

§Announce Policies

By default, samod will announce all the DocHandles it is synchronizing to all connected peers and will also send requests to any connected peers when you call Repo::find. This is often not what you want. To customize this logic you pass an implementation of AnnouncePolicy to RepoBuilder::with_announce_policy. Note that AnnouncePolicy is implemented for Fn(&DocumentId) -> bool so you can just pass a closure if you want.

let authorized_peer = samod::PeerId::from("alice");
let repo = samod::Repo::build_tokio().with_announce_policy(move |_doc_id, peer_id| {
   // Only announce documents to alice
   &peer_id == &authorized_peer
}).load().await;

§Runtimes

RuntimeHandle is a trait which is intended to abstract over the various runtimes available in the rust ecosystem. The most common runtime is tokio. tokio is a work-stealing runtime which means that the futures spawned on it must be Send, so that they can be moved between threads. This means that RuntimeHandle::spawn requires Send futures. This in turn means that the futures returned by the Storage and AnnouncePolicy traits are also Send so that they can be spawned onto the RuntimeHandle.

In many cases though, you may have a runtime which doesn’t require Send futures and you may have storage and announce policy implementations which cannot produce Send futures. This would often be the case in single threaded runtimes for example. In these cases you can instead implement LocalRuntimeHandle, which doesn’t require Send futures and then you implement LocalStorage and LocalAnnouncePolicy traits for your storage and announce policy implementations. You configure all these things via the RepoBuilder struct. Once you’ve configured the storage and announce policy implementations to use local variants you can then create a local Repo using RepoBuilder::load_local.

§Concurrency

Typically samod will be managing many documents. One for each DocHandle you retrieve via Repo::create or Repo::find but also one for any sync messages received about a particular document from remote peers (e.g. a sync server would have no DocHandles open but would still be running many document processes). By default document tasks will be handled on the async runtime provided to the RepoBuilder but this can be undesirable. Document operations can be compute intensive and so responsiveness may benefit from running them on a separate thread pool. This is the purpose of the RepoBuilder::with_concurrency method, which allows you to configure how document operations are processed. If you want to use the threadpool approach you will need to enable the threadpool feature.

§Why not just Automerge?

automerge is a low level library. It provides routines for manipulating documents in memory and an abstract data sync protocol. It does not actually hook this up to any kind of network or storage. Most of the work involved in doing this plumbing is straightforward, but if every application does it themselves, we don’t end up with interoperable applications. In particular we don’t end up with fungible sync servers. One of the core goals of this library is to allow application authors to be agnostic as to where the user synchronises data by implementing a generic network and storage layer which all applications can use.

§Example

Here’s a somewhat fully featured example of using samod to manage a todo list:

use automerge::{ReadDoc, transaction::{Transactable as _}};
use futures::StreamExt as _;

use samod::ConnDirection;

use std::convert::Infallible;


let repo = samod::Repo::build_tokio().load().await; // You don't have to use tokio

// Create an initial skeleton for our todo list
let mut initial_doc = automerge::Automerge::new();
initial_doc.transact::<_, _, automerge::AutomergeError>(|tx| {
    let todos = tx.put_object(automerge::ROOT, "todos", automerge::ObjType::List)?;
    Ok(())
}).unwrap();

// Now create a `samod::DocHandle` using `Repo::create`
let doc_handle_1 = repo.create(initial_doc).await.unwrap();

// Now, create second repo, representing some other device
let repo2 = samod::Repo::build_tokio().load().await;

// Connect the two repos to each other
let (tx_to_1, rx_from_2) = futures::channel::mpsc::unbounded();
let (tx_to_2, rx_from_1) = futures::channel::mpsc::unbounded();
tokio::spawn(repo2.connect(rx_from_1.map(Ok::<_, Infallible>), tx_to_1, ConnDirection::Outgoing));
tokio::spawn(repo.connect(rx_from_2.map(Ok::<_, Infallible>), tx_to_2, ConnDirection::Incoming));

// Wait for the second repo to be connected to the first repo
repo2.when_connected(repo.peer_id()).await.unwrap();

// Now fetch the document on repo2
let doc2 = repo2.find(doc_handle_1.document_id().clone()).await.unwrap().unwrap();

// Create a todo list item in doc2
doc2.with_document(|doc| {
    doc.transact(|tx| {
       let todos = tx.get(automerge::ROOT, "todos").unwrap().
          expect("todos should exist").1;
       tx.insert(todos, 0, "Buy milk")?;
      Ok::<_, automerge::AutomergeError>(())
  }).unwrap();
});

// Wait for the change to be received on repo1
doc_handle_1.changes().next().await.unwrap();

// See the the document handle on repo1 reflects the change
doc_handle_1.with_document(|doc| {
  let todos = doc.get(automerge::ROOT, "todos").unwrap().
     expect("todos should exist").1;
  let item = doc.get(todos, 0).unwrap().expect("item should exist").0;
  let automerge::Value::Scalar(val) = item else {
    panic!("item should be a scalar");
  };
  let automerge::ScalarValue::Str(s) = val.as_ref() else {
       panic!("item should be a string");
  };
  assert_eq!(s, "Buy milk");
  Ok::<_, automerge::AutomergeError>(())
}).unwrap();

Modules§

runtime
storage
The storage abstraction
websocket

Structs§

AlwaysAnnounce
Always announce every documents to every peer
AutomergeUrl
ConnectionInfo
The state of a connection to one peer
DocHandle
The state of a single automerge document the Repo is managing
DocumentId
Unique identifier for an automerge document.
PeerId
Unique identifier for a peer in the sync network.
Repo
The entry point to this library
RepoBuilder
A struct for configuring a Repo
Stopped
An error representing the fact that the repository has been stopped

Enums§

ConcurrencyConfig
How to run concurrent documents
ConnDirection
Indicates whether a connection was initiated locally or received from a remote peer
ConnFinishedReason
Why a connection future stopped

Traits§

AnnouncePolicy
Whether to announce a document to a peer
LocalAnnouncePolicy
A version of AnnouncePolicy that can be used with runtimes that don’t require Send or 'static bounds. See the module level documentation on runtimes for more details.