elbus 0.2.21

Local and network IPC bus
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
[![Build status](https://github.com/DirtyHairy/async-mutex/workflows/Build%20and%20Tests/badge.svg)](https://github.com/DirtyHairy/async-mutex/actions?query=workflow%3A%22Build+and+Tests%22)
[![NPM version](https://badge.fury.io/js/async-mutex.svg)](https://badge.fury.io/js/async-mutex)
[![Coverage Status](https://coveralls.io/repos/github/DirtyHairy/async-mutex/badge.svg?branch=master)](https://coveralls.io/github/DirtyHairy/async-mutex?branch=master)

# What is it?

This package implements primitives for synchronizing asynchronous operations in
Javascript.

## Mutex

The term "mutex" usually refers to a data structure used to synchronize
concurrent processes running on different threads. For example, before accessing
a non-threadsafe resource, a thread will lock the mutex. This is guaranteed
to block the thread until no other thread holds a lock on the mutex and thus
enforces exclusive access to the resource. Once the operation is complete, the
thread releases the lock, allowing other threads to acquire a lock and access the
resource.

While Javascript is strictly single-threaded, the asynchronous nature of its
execution model allows for race conditions that require similar synchronization
primitives. Consider for example a library communicating with a web worker that
needs to exchange several subsequent messages with the worker in order to achieve
a task. As these messages are exchanged in an asynchronous manner, it is perfectly
possible that the library is called again during this process. Depending on the
way state is handled during the async process, this will lead to race conditions
that are hard to fix and even harder to track down.

This library solves the problem by applying the concept of mutexes to Javascript.
Locking the mutex will return a promise that resolves once the mutex becomes
available. Once the async process is complete (usually taking multiple
spins of the event loop), a callback supplied to the caller is called in order
to release the mutex, allowing the next scheduled worker to execute.

# Semaphore

Imagine a situation where you need to control access to several instances of
a shared resource. For example, you might want to distribute images between several
worker processes that perform transformations, or you might want to create a web
crawler that performs a defined number of requests in parallel.

A semaphore is a data structure that is initialized to a positive integer value and that
can be locked multiple times.
As long as the semaphore value is positive, locking it will return the current value
and the locking process will continue execution immediately; the semaphore will
be decremented upon locking. Releasing the lock will increment the semaphore again.

Once the semaphore has reached zero, the next process that attempts to acquire a lock
will be suspended until another process releases its lock and this increments the semaphore
again.

This library provides a semaphore implementation for Javascript that is similar to the
mutex implementation described above.

# How to use it?

## Installation

You can install the library into your project via npm

    npm install async-mutex

The library is written in TypeScript and will work in any environment that
supports ES5, ES6 promises and `Array.isArray`. On ancient browsers,
a shim can be used (e.g. [core-js](https://github.com/zloirock/core-js)).
No external typings are required for using this library with
TypeScript (version >= 2).

Starting with Node 12.16 and 13.7, native ES6 style imports are supported.

**WARNING:** Node 13 versions < 13.2.0 fail to import this package correctly.
Node 12 and earlier are fine, as are newer versions of Node 13.

## Importing

**CommonJS:**
```javascript
var Mutex = require('async-mutex').Mutex;
var Semaphore = require('async-mutex').Semaphore;
var withTimeout = require('async-mutex').withTimeout;
```

**ES6:**
```javascript
import {Mutex, Semaphore, withTimeout} from 'async-mutex';
```

**TypeScript:**
```typescript
import {Mutex, MutexInterface, Semaphore, SemaphoreInterface, withTimeout} from 'async-mutex';
```

With the latest version of Node, native ES6 style imports are supported.

##  Mutex API

### Creating

```typescript
const mutex = new Mutex();
```

Create a new mutex.

### Synchronized code execution

Promise style:
```typescript
mutex
    .runExclusive(function() {
        // ...
    })
    .then(function(result) {
        // ...
    });
```

async/await:
```typescript
await mutex.runExclusive(async () => {
    // ...
});
```

`runExclusive` schedules the supplied callback to be run once the mutex is unlocked.
The function may return a promise. Once the promise is resolved or rejected (or immediately after
execution if an immediate value was returned),
the mutex is released. `runExclusive` returns a promise that adopts the state of the function result.

The mutex is released and the result rejected if an exception occurs during execution
if the callback.

### Manual locking / releasing

Promise style:
```typescript
mutex
    .acquire()
    .then(function(release) {
        // ...

        release();
    });
```

async/await:
```typescript
const release = await mutex.acquire();
try {
    // ...
} finally {
    release();
}
```

`acquire` returns an (ES6) promise that will resolve as soon as the mutex is
available. The promise resolves with a function `release` that
must be called once the mutex should be released again.

**IMPORTANT:** Failure to call `release` will hold the mutex locked and will
likely deadlock the application. Make sure to call `release` under all circumstances
and handle exceptions accordingly.

`acquire` / `release` should be considered a low level API. In most situations,
`runExclusive` will be a better choice that automatically takes care of releasing
the mutex once a block of code has executed exclusively.

### Checking whether the mutex is locked

```typescript
mutex.isLocked();
```

### Cancelling pending locks.

Pending locks can be cancelled by calling `cancel()` on the mutex. This will reject
all pending locks with `E_CANCELED`:

Promise style:
```typescript
import {E_CANCELED} from 'async-mutex';

mutex
    .runExclusive(() => {
        // ...
    })
    .then(() => {
        // ...
    })
    .catch(e => {
        if (e === E_CANCELED) {
            // ...
        }
    });
```

async/await:
```typescript
import {E_CANCELED} from 'async-mutex';

try {
    await mutex.runExclusive(() => {
        // ...
    });
} catch (e) {
    if (e === E_CANCELED) {
        // ...
    }
}
```

This works with `acquire`, too:
if `acquire` is used for locking, the resulting promise will reject with `E_CANCELED`.

The error that is thrown can be customized by passing a different error to the `Mutex`
constructor:

```typescript
const mutex = new Mutex(new Error('fancy custom error'));
```

Note that while all pending locks are cancelled, a currently held lock will not be
revoked. In consequence, the mutex may not be available even after `cancel()` has been called.

### Waiting until the mutex is available

You can wait until the mutex is available without locking it by calling `waitForUnlock()`.
This will return a promise that resolve once the mutex can be acquired again. This operation
will not lock the mutex, and there is no gurantee that the mutex will still be available
once a async barrier has been encountered.

Promise style:
```typescript
mutex
    .waitForUnlock()
    .then(() => {
        // ...
    });
```

Async/await:
```typescript
await mutex.waitForUnlock();
// ...
```


##  Semaphore API

### Creating

```typescript
const semaphore = new Semaphore(initialValue);
```

Creates a new semaphore. `initialValue` is a positive integer that defines the
initial value of the semaphore (aka the maximum number of concurrent consumers).

### Synchronized code execution

Promise style:
```typescript
semaphore
    .runExclusive(function(value) {
        // ...
    })
    .then(function(result) {
        // ...
    });
```

async/await:
```typescript
await semaphore.runExclusive(async (value) => {
    // ...
});
```

`runExclusive` schedules the supplied callback to be run once the semaphore is available.
The callback will receive the current value of the semaphore as its argument.
The function may return a promise. Once the promise is resolved or rejected (or immediately after
execution if an immediate value was returned),
the semaphore is released. `runExclusive` returns a promise that adopts the state of the function result.

The semaphore is released and the result rejected if an exception occurs during execution
of the callback.

### Manual locking / releasing

Promise style:
```typescript
semaphore
    .acquire()
    .then(function([value, release]) {
        // ...

        release();
    });
```

async/await:
```typescript
const [value, release] = await semaphore.acquire();
try {
    // ...
} finally {
    release();
}
```

`acquire` returns an (ES6) promise that will resolve as soon as the semaphore is
available. The promise resolves to an array with the
first entry being the current value of the semaphore, and the second value a
function that must be called to release the semaphore once the critical operation
has completed.

**IMPORTANT:** Failure to call `release` will hold the semaphore locked and will
likely deadlock the application. Make sure to call `release` under all circumstances
and handle exceptions accordingly.

`acquire` / `release` should be considered a low level API. In most situations,
`runExclusive` will be a better choice that automatically takes care of releasing
the mutex once a block of code has executed exclusively.

### Checking whether the semaphore is locked

```typescript
semaphore.isLocked();
```

The semaphore is considered to be locked if it has a value of zero.

### Cancelling pending locks.

Pending locks can be cancelled by calling `cancel()` on the sempahore. This will reject
all pending locks with `E_CANCELED`:

Promise style:
```typescript
import {E_CANCELED} from 'async-mutex';

semaphore
    .runExclusive(() => {
        // ...
    })
    .then(() => {
        // ...
    })
    .catch(e => {
        if (e === E_CANCELED) {
            // ...
        }
    });
```

async/await:
```typescript
import {E_CANCELED} from 'async-mutex';

try {
    await semaphore.runExclusive(() => {
        // ...
    });
} catch (e) {
    if (e === E_CANCELED) {
        // ...
    }
}
```

This works with `aquire`, too:
if `acquire` is used for locking, the resulting promise will reject with `E_CANCELED`.

The error that is thrown can be customized by passing a different error to the `Semaphore`
constructor:

```typescript
const semaphore = new Semaphore(2, new Error('fancy custom error'));
```

Note that while all pending locks are cancelled, any currently held locks will not be
revoked. In consequence, the semaphore may not be available even after `cancel()` has been called.

### Waiting until the semaphore is available

You can wait until the semaphore is available without locking it by calling `waitForUnlock()`.
This will return a promise that resolve once the semaphore can be acquired again. This operation
will not lock the semaphore, and there is no gurantee that the semaphore will still be available
once a async barrier has been encountered.

Promise style:
```typescript
semaphore
    .waitForUnlock()
    .then(() => {
        // ...
    });
```

Async/await:
```typescript
await semaphore.waitForUnlock();
// ...
```

## Limiting the time waiting for a mutex or semaphore to become available

Sometimes it is desirable to limit the time a program waits for a mutex or
semaphore to become available. The `withTimeout` decorator can be applied
to both semaphores and mutexes and changes the behavior of `acquire` and
`runExclusive` accordingly.

```typescript
import {withTimeout, E_TIMEOUT} from 'async-mutex';

const mutexWithTimeout = withTimeout(new Mutex(), 100);
const semaphoreWithTimeout = withTimeout(new Semaphore(5), 100);
```

The API of the decorated mutex or semaphore is unchanged.

The second argument of `withTimeout` is the timeout in milliseconds. After the
timeout is exceeded, the promise returned by `acquire` and `runExclusive` will
reject with `E_TIMEOUT`. The latter will not run the provided callback in case
of an timeout.

The third argument of `withTimeout` is optional and can be used to
customize the error with which the promise is rejected.

```typescript
const mutexWithTimeout = withTimeout(new Mutex(), 100, new Error('new fancy error'));
const semaphoreWithTimeout = withTimeout(new Semaphore(5), 100, new Error('new fancy error'));
```

### Failing early if the mutex or semaphore is not available

A shortcut exists for the case where you do not want to wait for a lock to
be available at all. The `tryAcquire` decorator can be applied to both mutexes
and semaphores and changes the behavior of `acquire` and `runExclusive` to
immediately throw `E_ALREADY_LOCKED` if the mutex is not available.

Promise style:
```typescript
import {tryAcquire, E_ALREADY_LOCKED} from 'async-mutex';

tryAcquire(semaphoreOrMutex)
    .runExclusive(() => {
        // ...
    })
    .then(() => {
        // ...
    })
    .catch(e => {
        if (e === E_ALREADY_LOCKED) {
            // ...
        }
    });

```

async/await:
```typescript
import {tryAcquire, E_ALREADY_LOCKED} from 'async-mutex';

try {
    await tryAcquire(semaphoreOrMutex).runExclusive(() => {
        // ...
    });
} catch (e) {
    if (e === E_ALREADY_LOCKED) {
        // ...
    }
}

```

Again, the error can be customized by providing a custom error as second argument to
`tryAcquire`.

# License

Feel free to use this library under the conditions of the MIT license.