hakuban 0.8.5

Data-object sharing library
Documentation
import * as hakuban from "../hakuban-with-wasm.js"
import m from "mithril"

sleep = (s)=>new Promise((resolve)->setTimeout(resolve, s*1000.0))

assert = (number, bool, message)->
	if not bool
		throw "Assertion '" + number + "' failed! " + if message? then message else ""

class Order
	next: (arg)->
		@promise_resolve(arg)  if @promise_resolve?
		@promise = new Promise (@promise_resolve)=>null
		@promise



tests =

	exchange_create_and_drop: ()->
		exchange = new hakuban.Exchange("exchange")
		exchange.drop()
		"PASS"

	tag_descriptor_create_on_js_side: ()->
		tag_descriptor = new hakuban.TagDescriptor("xxx")
		"PASS"

	object_descriptor_create_on_js_side: ()->
		tag_descriptor = new hakuban.TagDescriptor("xxx")
		object_descriptor = new hakuban.ObjectDescriptor([tag_descriptor],"yyy")
		"PASS"

	object_observe_contract_create: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		contract = exchange.object_observe_contract(descriptor).build()
		"PASS"

	object_expose_contract_create: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		contract = exchange.object_expose_contract(descriptor).build()
		"PASS"

	object_descriptor_can_be_received_from_wasm: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		observe_contract = exchange.object_observe_contract(descriptor).build()
		expose_contract = exchange.object_expose_contract(descriptor).build()
		object_state_sink = await expose_contract.next()
		descriptor_received = object_state_sink.descriptor()
		assert(1, descriptor.json == descriptor_received.json)
		assert(2, descriptor.tags[0].json == "b")
		"PASS"

	object_object_instant_propagation: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		observe_contract = exchange.object_observe_contract(descriptor).build()
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState(new TextEncoder().encode("data"), [1,2], ["t1"])
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		object_state_stream = await observe_contract.next()
		assert(1, object_state_stream.descriptor().json == "a")
		state = await object_state_stream.next()
		assert(2, state.version.length == 2)
		assert(3, state.version[0] == 1n)
		assert(4, state.version[1] == 2n)
		assert(5, state.format.length == 1)
		assert(6, state.format[0] == "t1")
		assert(7, state.synchronized_us_ago == 0n)
		assert(8, new TextDecoder().decode(state.data) == "data")
		"PASS"

	object_object_instant_propagation_with_json_serialization: ()->
		await sleep(0.1)  #so sync with 100ms doesn't trigger arithmetic underflow because of stupid timestamping method
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		observe_contract = exchange.object_observe_contract(descriptor).build()
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState({"data": 1}).with_version([1,2]).with_format("t1").with_synchronized_us_ago(100000)
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state.json_serialize())
		object_state_stream = await observe_contract.next()
		assert(1, object_state_stream.descriptor().json == "a")
		state = (await object_state_stream.next()).json_deserialize()
		assert(2, state.version.length == 2)
		assert(3, state.version[0] == 1n)
		assert(4, state.version[1] == 2n)
		assert(5, state.format.length == 1)
		assert(6, state.format[0] == "t1")
		assert(7, state.synchronized_us_ago >= 100000n)
		assert(8, state.data.data == 1)
		"PASS"

	object_object_delayed_expose_propagation: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		order = 0
		observed = new Promise (resolve)->
			observe_contract = exchange.object_observe_contract(descriptor).build()
			assert(1, order++ == 0)
			object_state_stream = await observe_contract.next()
			state = await object_state_stream.next()
			assert(2, state.version.length == 2)
			assert(3, state.version[0] == 1n)
			assert(4, state.version[1] == 2n)
			assert(5, state.format.length == 1)
			assert(6, state.format[0] == "t1")
			assert(7, state.synchronized_us_ago == 0n)
			assert(8, new TextDecoder().decode(state.data) == "data")
			resolve("PASS")
		assert(9, order++ == 1)
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState(new TextEncoder().encode("data"), [1,2], ["t1"])
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		assert(10, (await observed) == "PASS")
		"PASS"

	object_tag_instant_propagation: ()->
		exchange = new hakuban.Exchange("exchange")
		tag_descriptor = new hakuban.TagDescriptor("b")
		descriptor = new hakuban.ObjectDescriptor([tag_descriptor],"a")
		observe_contract = exchange.tag_observe_contract(tag_descriptor).build()
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState("data", [2], ["t1"])
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		object_state_stream = await observe_contract.next()
		state = await object_state_stream.next()
		assert(1, state.version[0] == 2n)
		"PASS"

	tag_object_instant_propagation: ()->
		exchange = new hakuban.Exchange("exchange")
		tag_descriptor = new hakuban.TagDescriptor("b")
		descriptor = new hakuban.ObjectDescriptor([tag_descriptor],"a")
		observe_contract = exchange.object_observe_contract(descriptor).build()
		expose_contract = exchange.tag_expose_contract(tag_descriptor).build()
		new_state = new hakuban.ObjectState("data", [2], ["t1"])
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		object_state_stream = await observe_contract.next()
		state = await object_state_stream.next()
		assert(1, state.version[0] == 2n)
		"PASS"

	object_state_sink_next_and_desynchronize_on_drop: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		order = 0
		cont_rx = new Promise (@cont_tx)=>null
		observed = new Promise (resolve)=>
			observe_contract = exchange.object_observe_contract(descriptor).build()
			assert(1, order++ == 0)
			object_state_stream = await observe_contract.next()
			state = await object_state_stream.next()
			assert(2, state.synchronized_us_ago == 0n)
			assert(3, order++ == 2)
			@cont_tx()
			state = await object_state_stream.next()
			assert(4, state.synchronized_us_ago > 0n)
			assert(5, order++ == 4)
			observe_contract.drop()
			resolve("PASS")
		assert(6, order++ == 1)
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState("data", [1,2], ["t1"])
		object_state_sink = await expose_contract.next()
		object_state_sink_params = await object_state_sink.next()
		assert(7, object_state_sink_params != null)
		await object_state_sink.send(new_state)
		await cont_rx
		assert(9, order++ == 3)
		object_state_sink.drop()
		assert(10, (await observed) == "PASS")
		object_state_sink_params = await object_state_sink.next()
		assert(11, object_state_sink_params == null)
		"PASS"

	contract_drop_from_other_fiber: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		@order = new Order()
		observed = new Promise (resolve)=>
			observe_contract = exchange.tag_observe_contract(descriptor.tags[0]).build()
			object_state_stream = await observe_contract.next()
			assert(2, object_state_stream != null)
			state = await object_state_stream.next()
			assert(3, state != null)
			await @order.next(observe_contract)
			state = await object_state_stream.next()
			assert(4, state == null)
			resolve("PASS")
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState("data")
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		observe_contract = await @order.next()
		await sleep(0.1) #hoping that the other fiber starts awating stream.next
		observe_contract.drop()
		@order.next()
		assert(5, (await observed) == "PASS")
		"PASS"

	stream_drop_from_other_fiber: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		@order = new Order()
		observed = new Promise (resolve)=>
			observe_contract = exchange.tag_observe_contract(descriptor.tags[0]).build()
			object_state_stream = await observe_contract.next()
			assert(2, object_state_stream != null)
			state = await object_state_stream.next()
			assert(3, state != null)
			await @order.next(object_state_stream)
			state = await object_state_stream.next()
			assert(4, state == null)
			resolve("PASS")
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState("data")
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		object_state_stream = await @order.next()
		await sleep(0.1) #hoping that the other fiber starts awating stream.next
		object_state_stream.drop()
		@order.next()
		assert(5, (await observed) == "PASS")
		"PASS"


	object_state_stream_reemit_new_stream_on_drop: ()->
		exchange = new hakuban.Exchange("exchange")
		descriptor = new hakuban.ObjectDescriptor(["b"],"a")
		observe_contract = exchange.object_observe_contract(descriptor).build()
		expose_contract = exchange.object_expose_contract(descriptor).build()
		new_state = new hakuban.ObjectState(new TextEncoder().encode("data"), [1,2], ["t1"])
		object_state_sink = await expose_contract.next()
		await object_state_sink.send(new_state)
		object_state_stream = await observe_contract.next()
		object_state_stream.drop()
		object_state_stream = await observe_contract.next()
		state = await object_state_stream.next()
		assert(1, state.version.length == 2)
		assert(2, state.version[0] == 1n)
		assert(3, state.version[1] == 2n)
		assert(4, state.format.length == 1)
		assert(5, state.format[0] == "t1")
		assert(6, state.synchronized_us_ago == 0n)
		assert(7, new TextDecoder().decode(state.data) == "data")
		"PASS"

	block_api: ()->
		window.exchange = exchange = new hakuban.Exchange("exchange")
		tag_descriptor = new hakuban.TagDescriptor("b")
		object_descriptor = new hakuban.ObjectDescriptor([tag_descriptor],"a")
		state1 = new hakuban.ObjectState(new TextEncoder().encode("state1"))
		state2 = new hakuban.ObjectState(new TextEncoder().encode("state2"))
		state3 = new hakuban.ObjectState(new TextEncoder().encode("state3"))
		order = new Order()

		thread1 = new Promise (resolve)->
			await exchange.object_expose_contract(object_descriptor).build (contract)->
				await contract.next (state_sink)->
					await state_sink.send(state1)
				await order.next()
				iterations = 0
				for await state_sink from contract
					await state_sink.send(state2)
					await order.next()
					await state_sink.send(state3)
					await order.next()
					await order.next()
					await contract.drop()
			resolve()

		thread2 = new Promise (resolve)->
			await exchange.object_observe_contract(object_descriptor).build (contract)->
				await contract.next (state_stream)->
					await state_stream.next (state)->
						assert(1, new TextDecoder().decode(state.data) == new TextDecoder().decode(state1.data))
					await order.next()
				iterations = 0
				await contract.for_each (state_stream)->
					await state_stream.for_each (state)->
						iterations++
						if iterations == 1
							assert(2, new TextDecoder().decode(state.data) == new TextDecoder().decode(state1.data))
						if iterations == 2
							assert(3, new TextDecoder().decode(state.data) == new TextDecoder().decode(state2.data))
						if iterations > 2
							await contract.drop()
						await order.next()
			order.next()
			resolve()

		await thread1
		await thread2
		"PASS"

	never_materialized_state_can_be_dropped: ()->
		new_state = new hakuban.ObjectState("data", [1,2], ["t1"])
		await new_state.drop()
		"PASS"




window.hakuban = hakuban
window.onload = ()->
	assert(-1, (await hakuban.initialize("trace"))?, "failed to initialize hakuban")

	filter = window.location.hash.substr(1)

	state =
		tests: ({name: name, state: "waiting", body: body} for name, body of tests)

	for test in state.tests
		if !filter? or test.name.match(filter)
			console.log("Running:", test.name)
			test.state = "running"
			m.render(document.body, m(Page, {state: state}))
			try
				test.state = await test.body()
			catch error
				test.state = "FAIL"
				console.error(error)
			m.render(document.body, m(Page, {state: state}))
		else
			test.state = "disabled"



class Page
	view: (vnode)->

		state_styles =
			waiting: { color: "gray" }
			running: { color: "blue" }
			PASS: { color: "green" }
			FAIL: { color: "red" }
			disabled: { color: "gray" }

		m "div", style: { display: "grid", "grid-template-columns": "max-content auto", margin: "2em" },
			for test in vnode.attrs.state.tests
				[
					m "div", style: { "padding-right": "2em" }, test.name
					m "div", style: state_styles[test.state], test.state
				]