Expand description
§Samod
samod
is a library for building collaborative applications which work offlne
and don’t require servers (though servers certainly can be useful). This is
achieved by representing data as automerge
documents. samod
is wire compatible with the automerge-repo
JavaScript library.
§What does all that mean?
samod
helps you manage automerge “documents”, which are hierarchical data
structures composed of maps, lists, text, and primitive values - a little
like JSON. Every change you make to a document is recorded and you can move
back and forth through the history of a document - it’s a bit like Git for
JSON. samod
takes care of storing changes for you and synchronizing them
with connected peers. The interesting part is that given this very detailed
history which we never discard, we can merge documents with changes which
were made concurrently. This means that we can build applications which
allow multiple users to edit the same document without having to have all
changes go through a server.
§How it all works
The library is structured around a Repo
, which talks to a Storage
instance and to which you can connect to other peers using
Repo::connect
. Once you have a Repo
you can
create documents using Repo::create
, or look up existing docuements using
Repo::find
. In either case you will get back a DocHandle
which you can
use to interact with the document.
Typically then, your workflow will look like this:
- Initialize a
Repo
at application startup, passing it aRuntimeHandle
implementation andStorage
implementation - Whenever you have connections available (maybe you are connecting to a
sync server, maybe you are receiving peer-to-peer connections) you call
Repo::connect
to drive the connection state. - Create
DocHandle
s usingRepo::create
and look up existing documents usingRepo::find
- Modify documents using
DocHandle::with_document
Let’s walk through each of those steps.
§Initializing a Repo
To initialize a Repo
you call Repo::builder()
to obtain a
RepoBuilder
which you use to configure the repo before calling RepoBuilder::load()
to actually load the repository. For example:
let repo = samod::Repo::builder(tokio::runtime::Handle::current())
.with_storage(samod::storage::InMemoryStorage::new())
.load()
.await;
})
The first argument to builder
is an implementation of RuntimeHandle
.
Default implementations are provided for tokio
and gio
which can be
conveniently used via Repo::build_tokio
and Repo::build_gio
respectively. The RuntimeHandle
trait is straightforward to implement if
you want to use some other async runtime.
By default samod
uses an in-memory storage implementation. This is great
for prototyping but in most cases you do actually want to persist data somewhere.
In this case you’ll need an implementation of Storage
to pass to
RepoBuilder::with_storage
It is possible to use Storage
and AnnouncePolicy
implementations which
do not produce Send
futures. In this case you will also need a runtime which
can spawn non-Send
futures. See the runtimes section for more
details.
§Connecting to peers
Once you have a Repo
you can connect it to peers using Repo::connect
.
This method returns a future which must be driven to completion to run the
connection. Here’s an example where we use futures::channel::mpsc
as the
transport. We create two repos and connect them with these channels.
use samod::ConnDirection;
use futures::{StreamExt, channel::mpsc};
use std::convert::Infallible;
tokio_test::block_on(async {
let alice = samod::Repo::build_tokio().load().await;
let bob = samod::Repo::build_tokio().load().await;
// Set up bidirectional channels
let (tx_to_bob, rx_from_alice) = mpsc::unbounded();
let (tx_to_alice, rx_from_bob) = mpsc::unbounded();
// This is just to make the types line up, ignore it
let rx_from_alice = rx_from_alice.map(Ok::<_, Infallible>);
let rx_from_bob = rx_from_bob.map(Ok::<_, Infallible>);
// Run the connection futures
tokio::spawn(alice.connect(rx_from_bob, tx_to_bob, ConnDirection::Outgoing));
tokio::spawn(bob.connect(rx_from_alice, tx_to_alice, ConnDirection::Incoming));
});
If you are using tokio
and connecting to something like a TCP socket you
can use Repo::connect_tokio_io
, which reduces some boilerplate:
use samod::ConnDirection;
let repo: samod::Repo = todo!();
let io = tokio::net::TcpStream::connect("sync.automerge.org").await.unwrap();
tokio::spawn(repo.connect_tokio_io(io, ConnDirection::Outgoing));
If you are connecting to JavaScript sync server using WebSockets you can enable the
tungstenite
feature and use Repo::connect_websocket
. If you are accepting
websocket connections in an axum
server you can use Repo::accept_axum
.
§Managing Documents
Once you have a Repo
you can use it to manage DocHandle
s. A
DocHandle
represents an automerge
document which the Repo
is managing. “managing” here means a few things:
- Any changes made to the document using
DocHandle::with_document
will be persisted to storage and synchronized with connected peers (subject to theAnnouncePolicy
). - Any changes received from connected peers will be applied to the
document and made visible to the application. You can listen for
these changes using
DocHandle::changes
.
To create a new document you use Repo::create
which will return
once the document has been persisted to storage. To look up an existing
document you use Repo::find
. This will first look in storage, then
if the document is not found in storage it will request the document
from all connected peers (again subject to the AnnouncePolicy
). If
any peer has the document the future returned by Repo::find
will
resolve once we have synchronized with at least one remote peer which
has the document.
You can make changes to a document using DocHandle::with_document
.
§Announce Policies
By default, samod
will announce all the DocHandle
s it is synchronizing
to all connected peers and will also send requests to any connected peers
when you call Repo::find
. This is often not what you want. To customize
this logic you pass an implementation of AnnouncePolicy
to
RepoBuilder::with_announce_policy
. Note that AnnouncePolicy
is implemented
for Fn(&DocumentId) -> bool
so you can just pass a closure if you want.
let authorized_peer = samod::PeerId::from("alice");
let repo = samod::Repo::build_tokio().with_announce_policy(move |_doc_id, peer_id| {
// Only announce documents to alice
&peer_id == &authorized_peer
}).load().await;
§Runtimes
RuntimeHandle
is a trait which is intended to abstract over the various
runtimes available in the rust ecosystem. The most common runtime is tokio
.
tokio
is a work-stealing runtime which means that the futures spawned on it
must be Send
, so that they can be moved between threads. This means that
RuntimeHandle::spawn
requires Send
futures. This in turn means that
the futures returned by the Storage
and AnnouncePolicy
traits are
also Send
so that they can be spawned onto the RuntimeHandle
.
In many cases though, you may have a runtime which doesn’t require Send
futures and you may have storage and announce policy implementations which
cannot produce Send
futures. This would often be the case in single
threaded runtimes for example. In these cases you can instead implement
LocalRuntimeHandle
, which doesn’t require Send
futures and then
you implement LocalStorage
and LocalAnnouncePolicy
traits for
your storage and announce policy implementations. You configure all these
things via the RepoBuilder
struct. Once you’ve configured the storage
and announce policy implementations to use local variants you can then
create a local Repo
using RepoBuilder::load_local
.
§Concurrency
Typically samod
will be managing many documents. One for each DocHandle
you retrieve via Repo::create
or Repo::find
but also one for any
sync messages received about a particular document from remote peers (e.g.
a sync server would have no DocHandle
s open but would still be running
many document processes). By default document tasks will be handled on the
async runtime provided to the RepoBuilder
but this can be undesirable.
Document operations can be compute intensive and so responsiveness may
benefit from running them on a separate thread pool. This is the purpose
of the RepoBuilder::with_concurrency
method, which allows you to
configure how document operations are processed. If you want to use the
threadpool approach you will need to enable the threadpool
feature.
§Why not just Automerge?
automerge
is a low level library. It provides routines for manipulating
documents in memory and an abstract data sync protocol. It does not actually
hook this up to any kind of network or storage. Most of the work involved
in doing this plumbing is straightforward, but if every application does
it themselves, we don’t end up with interoperable applications. In particular
we don’t end up with fungible sync servers. One of the core goals of this
library is to allow application authors to be agnostic as to where the
user synchronises data by implementing a generic network and storage layer
which all applications can use.
§Example
Here’s a somewhat fully featured example of using samod
to manage a todo list:
use automerge::{ReadDoc, transaction::{Transactable as _}};
use futures::StreamExt as _;
use samod::ConnDirection;
use std::convert::Infallible;
let repo = samod::Repo::build_tokio().load().await; // You don't have to use tokio
// Create an initial skeleton for our todo list
let mut initial_doc = automerge::Automerge::new();
initial_doc.transact::<_, _, automerge::AutomergeError>(|tx| {
let todos = tx.put_object(automerge::ROOT, "todos", automerge::ObjType::List)?;
Ok(())
}).unwrap();
// Now create a `samod::DocHandle` using `Repo::create`
let doc_handle_1 = repo.create(initial_doc).await.unwrap();
// Now, create second repo, representing some other device
let repo2 = samod::Repo::build_tokio().load().await;
// Connect the two repos to each other
let (tx_to_1, rx_from_2) = futures::channel::mpsc::unbounded();
let (tx_to_2, rx_from_1) = futures::channel::mpsc::unbounded();
tokio::spawn(repo2.connect(rx_from_1.map(Ok::<_, Infallible>), tx_to_1, ConnDirection::Outgoing));
tokio::spawn(repo.connect(rx_from_2.map(Ok::<_, Infallible>), tx_to_2, ConnDirection::Incoming));
// Wait for the second repo to be connected to the first repo
repo2.when_connected(repo.peer_id()).await.unwrap();
// Now fetch the document on repo2
let doc2 = repo2.find(doc_handle_1.document_id().clone()).await.unwrap().unwrap();
// Create a todo list item in doc2
doc2.with_document(|doc| {
doc.transact(|tx| {
let todos = tx.get(automerge::ROOT, "todos").unwrap().
expect("todos should exist").1;
tx.insert(todos, 0, "Buy milk")?;
Ok::<_, automerge::AutomergeError>(())
}).unwrap();
});
// Wait for the change to be received on repo1
doc_handle_1.changes().next().await.unwrap();
// See the the document handle on repo1 reflects the change
doc_handle_1.with_document(|doc| {
let todos = doc.get(automerge::ROOT, "todos").unwrap().
expect("todos should exist").1;
let item = doc.get(todos, 0).unwrap().expect("item should exist").0;
let automerge::Value::Scalar(val) = item else {
panic!("item should be a scalar");
};
let automerge::ScalarValue::Str(s) = val.as_ref() else {
panic!("item should be a string");
};
assert_eq!(s, "Buy milk");
Ok::<_, automerge::AutomergeError>(())
}).unwrap();
Modules§
Structs§
- Always
Announce - Always announce every documents to every peer
- Automerge
Url - Connection
Info - The state of a connection to one peer
- DocHandle
- The state of a single
automerge
document theRepo
is managing - Document
Id - Unique identifier for an automerge document.
- PeerId
- Unique identifier for a peer in the sync network.
- Repo
- The entry point to this library
- Repo
Builder - A struct for configuring a
Repo
- Stopped
- An error representing the fact that the repository has been stopped
Enums§
- Concurrency
Config - How to run concurrent documents
- Conn
Direction - Indicates whether a connection was initiated locally or received from a remote peer
- Conn
Finished Reason - Why a connection future stopped
Traits§
- Announce
Policy - Whether to announce a document to a peer
- Local
Announce Policy - A version of
AnnouncePolicy
that can be used with runtimes that don’t requireSend
or'static
bounds. See the module level documentation on runtimes for more details.